replication: add incremental stream compression transport#15
Closed
roshkhatri wants to merge 72 commits into
Closed
replication: add incremental stream compression transport#15roshkhatri wants to merge 72 commits into
roshkhatri wants to merge 72 commits into
Conversation
4a72480 to
0fc757a
Compare
8cd87d7 to
4dccdd4
Compare
0b91402 to
93a5ad0
Compare
| "slave_read_only:%d\r\n", server.repl_replica_ro, | ||
| "replica_announced:%d\r\n", server.replica_announced)); | ||
| if (server.repl_decompression_errors > 0) { | ||
| info = sdscatprintf(info, "repl_decompression_errors:%zu\r\n", |
522e226 to
3cd7c16
Compare
roshkhatri
added a commit
that referenced
this pull request
May 13, 2026
sdscatfmt is faster than sdscatprintf — it skips printf-style format parsing and uses its own lightweight specifier set. Matches the pattern used elsewhere in INFO output for unsigned counters (%U + cast to unsigned long long). Addresses review comment on PR #15.
roshkhatri
added a commit
that referenced
this pull request
May 13, 2026
Adds a CI job that re-runs all replication-tagged integration tests with replcompression=yes applied globally via ::global_overrides. Exercises the streaming-compression transport across the full replication test surface (replication, psync, dual-channel, buffer management, AOF-sync, cross-version, block-repl, replica-redirect) to catch regressions that only surface under compressed replication. Per review feedback on PR #15.
…alkey-io#3703) This test will insert keys, and as can be seen from the logs, the insertion somehow is very slow in this CI, eventually causing a repl-timeout disconnection. ``` 50655:M 14 May 2026 00:44:57.812 - DB 0: 5 keys (0 volatile) in 7 slots HT. 50655:M 14 May 2026 00:45:02.884 - DB 0: 6 keys (0 volatile) in 7 slots HT. 50655:M 14 May 2026 00:45:06.014 # Timing out slot migration xxx after not receiving ack for too long ``` Set repl-timeout for these tests to prevent the timeout disconnections. Also the tests does not actually require inserting distinct keys, we only need to fill the replication buffer, so there is no need for the different keyname, this can save the CI some memory. Closes valkey-io#3702. Signed-off-by: Binbin <binloveplay1314@qq.com>
…alkey-io#3719) ## Summary This PR addresses 2 race conditions where deferred `freeClient` (introduced by valkey-io#3324) clobbers replication state set by `REPLICAOF` commands. Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI failure in `tests/unit/wait.tcl`. ## Root Cause Since PR valkey-io#3324 ("Redesign IO threading communication model"), `freeClient()` on a primary client with pending IO is deferred via `freeClientAsync` (gated on `clientHasPendingIO`). When the deferred free eventually executes, it chains through `replicationCachePrimary()` -> `replicationHandlePrimaryDisconnection()`, which unconditionally sets `server.repl_state = REPL_STATE_CONNECT`. This causes two bugs: ### Bug 1: REPLICAOF NO ONE (SIGSEGV) `replicationUnsetPrimary()` sets `primary_host = NULL` before calling `freeClient`. The deferred free runs later, sets `repl_state = REPL_STATE_CONNECT` while `primary_host` is still NULL. `replicationCron` then calls `connectWithPrimary()` which passes NULL to `connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV. ### Bug 2: REPLICAOF newhost newport (connection leak) `replicationSetPrimary()` calls `freeClient(old_primary)` (deferred), then sets `primary_host` to the new IP and progresses `repl_state` to `REPL_STATE_CONNECTING` with a new connection handle in `server.repl_transfer_s`. The deferred free runs later, clobbers `repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls `connectWithPrimary()` again, overwriting `server.repl_transfer_s` without closing the previous connection -- an FD leak. ## Fix Make `replicationHandlePrimaryDisconnection()` only transition to `REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED` and `primary_host` is set. This means the disconnection is genuine and no other state transition has already occurred. If `repl_state` has already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is stale and the function leaves the state untouched. Additionally: - `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in depth). - `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test, and add dedicated tests for the repoint scenario. ## Reproduction Reproduced locally by establishing TLS replication and executing `REPLICAOF NO ONE`: - Without fix: server crashes with signal 11, accessing address 0x0 - With fix: server continues operating normally ## Testing - Full `unit/wait` test suite passes (51/51) with IO threads enabled - New tests "Repoint replica between primaries does not leak connections or crash" and "Rapid repoint does not crash or leak" pass - Crash reproduced locally over TLS (SIGSEGV without fix, graceful handling with fix) --------- Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com> Signed-off-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
…gtest (valkey-io#3683) release.c is compiled into valkey-cli, valkey-benchmark, and the valkeylib-gtest static library, but only valkey-server declared a dependency on the release_header custom target that runs mkreleasehdr.sh to generate src/release.h. On a clean build (no stale release.h on disk) or under high parallelism, those targets could attempt to compile release.c before the header was generated and fail with "release.h: No such file or directory". Add the missing add_dependencies(... release_header) for all three. Signed-off-by: Vadym Khoptynets <1099644+poiuj@users.noreply.github.com>
The hash-seed config is an sds string and may contain embedded NUL bytes (sdssplitargs preserves \xNN escapes inside double quotes). In the old code getHashSeedFromString() used strlen() on it, so two hash-seed values differing only after a \x00 byte collapsed to the same SipHash seed, can break it. hash-seed was added in valkey-io#2608. --------- Signed-off-by: Binbin <binloveplay1314@qq.com>
…alkey-io#3505) When a replica receives a keyless command from a client that has negotiated `CAPA REDIRECT` and has not sent `READONLY`, redirect it to the primary with a `-REDIRECT` response. Previously, keyless commands like `SCAN, DBSIZE, RANDOMKEY` always executed locally on replicas, and keyless writes like FLUSHDB, FLUSHALL returned -READONLY error with no way for clients to discover the primary. This reuses the existing `CAPA REDIRECT` capability. The capa signals that the client can handle -REDIRECT responses — it does not prescribe when redirects are sent. Clients that send READONLY opt into local replica execution and are not redirected. Only commands with CMD_READONLY or CMD_WRITE are redirected, so connection/admin commands (PING, READONLY, CLIENT) remain unaffected. EXEC with all-keyless queued commands (c->slot == -1) is treated as keyless and the transaction is discarded. --------- Signed-off-by: Yana Molodetsky <yamolodu@amazon.com> Signed-off-by: Yana Molodetsky <59420437+yanamolo@users.noreply.github.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
…cations (valkey-io#3743) When a module subscribes to NOTIFY_HASH keyspace events and blocks the client in the notification callback, hexpireGenericCommand, hgetdelCommand, and hpersistCommand would trigger debugServerAssert in notifyKeyspaceEvent() because addReplyArrayLen() sets buffered_reply=1 before the notification fires. This debug assert was added in valkey-io#1819. Affected commands: HEXPIRE, HPEXPIRE, HEXPIREAT, HPEXPIREAT, HGETDEL, HPERSIST. --------- Signed-off-by: Binbin <binloveplay1314@qq.com>
…ey-io#3678) Closes valkey-io#3663. ## Why Three callers in `src/modules/lua/script_lua.c` lost error-code handling during the valkey-io#2858 move of the Lua scripting engine into a module. On the wire, `redis.pcall()` (no args), `redis.setresp(4)`, `pcall(redis.log, 1)`, `pcall(redis.log, 10, 'msg')`, and `pcall(redis.set_repl)` now return the bare message instead of `ERR <message>`. OOM and WRONGTYPE happened to keep working because their code letters survived elsewhere by accident. Reproducer: ``` $ ./src/valkey-cli eval "return redis.pcall()" 0 "Please specify at least one argument for this redis lib call" # before this PR "ERR Please specify at least one argument for this redis lib call" # with this PR ``` ## What changed In `src/modules/lua/script_lua.c`: 1. `luaPushErrorBuff` — bare-message branch (case 2): prepend `ERR ` unless the message already starts with a code letter (defensive — five existing callers hardcode `"ERR ..."` strings; double-prefixing them would also be wrong). 2. `luaProcessReplyError` push_error branch: re-add the leading `-` before delegating to `luaPushError` so the case-1 code-extraction path handles `OOM`, `WRONGTYPE`, `READONLY`, etc. 3. `errorCallback` errno==0 branch: same re-add-`-` approach. 41 LoC across the three functions. ## Test `tests/unit/scripting.tcl` regression at line 1028 covers: - All five direct-API errors above — asserts `ERR ` prefix - WRONGTYPE error path — asserts the code survives and is not double-prefixed - Runs 4 times under the `foreach is_eval × foreach script_compatibility_api` matrix Fails on `unstable`, passes with this PR. --------- Signed-off-by: 1fanwang <1fannnw@gmail.com> Signed-off-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
…#3756) This reverts the [commit](valkey-io@fdd9039) that was merged as part of the PR valkey-io#3544 due to a performance regression observed [here](valkey-io#3750) Signed-off-by: akash kumar <akumdev@amazon.com>
Update the data-size to match the automated perf benchmarking as 96 is also embedded so moving to 128 to test for non-embedded. I have also update the wfs to use the config files from the `valkey-perf-benchmark` so we don't need to track the configs in 2 places, everything will be pulled from source `valkey-perf-benchmark` and the configs will be standard throughout all runs --------- Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
…alkey-io#3766) The HFE-blocking-keyspace-notify test added in valkey-io#3743 is racy and intermittently fails with: ``` Expected '{event del key h1} {event hdel key h1}' to be equal to '{event hdel key h1}' (context: ... cmd {assert_equal [lsort {{event hdel key h1} {event del key h1}}] [lsort [r b_keyspace.events]]} proc ::test) ``` ## Why it races When `HGETDEL` empties the hash, `hgetdelCommand` fires two notifications back-to-back on the main thread: 1. `notifyKeyspaceEvent(NOTIFY_HASH, "hdel", ...)` 2. `notifyKeyspaceEvent(NOTIFY_GENERIC, "del", ...)` (the key was removed) The `block_keyspace_notification` test module's callback always spawns a background thread that sleeps 1s, appends to the event log, and optionally unblocks the client. For the second notification, `RM_BlockClient` returns `NULL` (the client is already blocked by the first), so the second thread never unblocks anyone — it only appends `"del"` to the log. The first thread is the one that unblocks the client, so `HGETDEL` returns as soon as that thread has logged `"hdel"`. The test client then immediately reads `b_keyspace.events`, racing the second thread which has not yet acquired the event-log mutex. On a loaded host or under valgrind the second thread routinely loses the race, leaving only `"hdel"` visible. ## Fix `wait_for_condition` until both events have been logged before asserting on their contents. This matches the pattern already used by the four other event-log assertions in this file (e.g. the `Server-created keyspace notification` and `Event that fires twice` blocks). ```tcl wait_for_condition 50 100 { [llength [r b_keyspace.events]] >= 2 } else { fail "Did not see both hdel and del events: [r b_keyspace.events]" } assert_equal [lsort {{event hdel key h1} {event del key h1}}] [lsort [r b_keyspace.events]] ``` Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Signed-off-by: Harkrishn Patro <bunty.hari@gmail.com>
…key-io#3800) ### Bug When `ACL LOAD` deletes a user, it calls `freeClientOrCloseLater()` on clients authenticated as that user. If `freeClient()` defers the actual free (because the client has `flag.protected` set or has pending IO), the client remains in `server.clients` with `c->user` still pointing to the old user object. After `raxFreeWithCallback(old_users, ACLFreeUserVoid)` frees all old user objects, `c->user` becomes a dangling pointer. Any subsequent code that dereferences `c->user` on the deferred client triggers a heap-use-after-free. ### Crash sequence 1. Client A runs `ACL LOAD` 2. Client B is authenticated as a user that was removed from the ACL file 3. Client B has `flag.protected` set (e.g. has pending IO (IO threads) 4. `ACLLoadFromFile` → `freeClientOrCloseLater(B, 0)` → `freeClient(B)` → defers to `freeClientAsync` because B is protected 5. `raxFreeWithCallback(old_users, ...)` frees all old user objects including B's user 6. B is still in `server.clients` with `c->user` pointing to freed memory 7. `CLIENT LIST` or any path that calls `catClientInfoString` dereferences `c->user->name` → **UAF** ### Fix Set `c->user` to default user before calling `freeClientOrCloseLater()` in `ACLLoadFromFile`. The existing NULL guard in `catClientInfoString` (`client->user ? client->user->name : "(superuser)"`) handles this safely. ### Test Added `DEBUG PROTECT-CLIENT <client-id>` subcommand that calls `protectClient()` on the target, forcing `freeClient` to defer to async. This enables deterministic reproduction without timing races. The test: 1. Creates a user, connects a client as that user 2. Protects the client via `DEBUG PROTECT-CLIENT` 3. Removes the user from the ACL file and runs `ACL LOAD` 4. Runs `CLIENT LIST` which dereferences `c->user->name` Without the fix, this crashes under ASAN with: ``` ERROR: AddressSanitizer: heap-use-after-free in catClientInfoString READ of size 8 freed by: raxRecursiveFree → raxFreeWithCallback → ACLLoadFromFile ``` --------- Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
…alkey-io#3723) 1. Fix double-call to finishSlotMigrationJob in clusterUpdateSlotImportsOnOwnershipChange(): when n == myself, the function was called once with 'assigned to myself' message, then fell through to call it again with 'no longer owned by source'. Added else branch to make the two error paths mutually exclusive. 2. Fix RESP protocol violation in clusterCommandSyncSlotsFinish(): addReply(c, shared.ok) was sent unconditionally before validating the job name and state, so on error the client would receive both +OK and -ERR. Moved the OK reply after all validations pass. Note that it is an internal command and the only clients sending it are primary to replica connections and fake AOF clients. Both of those turn off replies, so the reply doesn't actually get parsed and therefore the violation has no effect. But good to clean it up Signed-off-by: chx9 <lovelypiska@outlook.com>
Collapse the save-notice if/else into one serverLog that always names the active algorithm (none/lzf/lz4), instead of branching the message. Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Replace em dashes in the compression comments with ASCII punctuation and add a short note above streamWriterCreate describing the writer/reader push/pull model and the finish-before-free requirement. Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
…ey-io#3848) Harden isValidAuxChar() to reject control characters (0x00-0x1F, 0x7F) and format-significant delimiters (comma, equals, quotes, backslash, space) that could corrupt `nodes.conf` parsing or enable injection of crafted node entries on restart. Additionally, add a validator for `cluster-announce-ip` which previously had no input validation, allowing arbitrary bytes to be persisted into the cluster config file. --------- Signed-off-by: Eran Ifrah <eifrah@amazon.com>
…#3904) ## Problem The shard ID restart tests in `cluster-shards.tcl` fail intermittently under valgrind because `pause_process` (SIGSTOP) can hit before the deferred config save is flushed to `nodes.conf`. When a replica learns its primary's shard ID via gossip, it updates `myself->shard_id` and sets `CLUSTER_TODO_SAVE_CONFIG` but the actual write only happens in `clusterBeforeSleep()` during the next event loop iteration. Under valgrind's slower execution, the test's SIGSTOP can arrive before that flush, so on restart the node gets a fresh random shard ID instead of the persisted one. ## Fix Add `CLUSTER SAVECONFIG` before pausing nodes in both restart tests, ensuring `nodes.conf` contains the current shard IDs on disk before SIGSTOP. This follows the same pattern used in `latency-monitor.tcl`. Fixes valkey-io#3883. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
This changes helps speed up the clang-format job. We don't really need to upgrade every pre-installed package on the runner (firefox and a bunch of other things the job never uses). Before this change: 2m 47s After this change: 31s Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
- Simplify the streaming APIs: caller-owned structs with Init/Free and descriptive self-parameter names; codec used only for the algorithm vtable. - Store the compression algorithm id directly in the VKCS envelope and drop the separate codec enum plus its translation functions. - Make the rio decorator opaque: track connection-backed streams with a flag instead of a transport-type enum. - Route LZ4 contexts through zmalloc and assert on caller errors; keep runtime errors for corrupt or external input. - Document the feed functions' streaming contract, mark the retriable capacity-shortage returns, and drop a redundant probe re-zero. Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
valkey-check-rdb reported offsets via rioTell() on every read, and rdbLoadProgressCallback computed the transport offset via rioTell() on every read even when no progress event was due. Read processed_bytes directly for the offset, and only compute the transport position when a progress event is actually reported. Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
…ey-io#3888) Rework commandDbIdArgs so each helper returns the argv index of every dbid argument it owns. ACLSelectorCheckCmd uses those positions to set keyidxptr directly, replacing the cmd->proc-based offset table. The caller now reads the dbid value via getLongLongFromObject(argv[positions[i]], ...). New db=-checked commands only need to implement their own *DbIdArgs helper. The fix is for COPY: the old chain fell through to 0, so the "object" field in ACL LOG was always showed "copy", it now reports the first denied dbid. Based on valkey-io#3801, also see it for more details, DB ACL was added in valkey-io#2309. Signed-off-by: Binbin <binloveplay1314@qq.com>
…lision (valkey-io#3925) ## Problem The per-library function dict (`libraryFunctionDictType`) compares function names **case-sensitively**, while the global function dict (`functionDictType`) compares them **case-insensitively** (`dictCStrCaseHash` + `dictSdsKeyCaseCompare`). This asymmetry lets a single library register two functions whose names differ only in case, leaving the two dicts inconsistent and crashing the server on the next unlink. ### Reproducer (any client with the default `SCRIPTING` ACL category) ``` FUNCTION LOAD "#!lua name=poc redis.register_function('aaa', function() return 1 end) redis.register_function('AAA', function() return 2 end)" FUNCTION DELETE poc ``` The server aborts with: ``` === ASSERTION FAILED === ==> functions.c:332 'ret == DICT_OK' is not true 0 valkey-server ... libraryUnlink + ... ``` ### Mechanism 1. `FUNCTION LOAD` — `functionLibCreateFunction` checks for duplicates in the per-library dict case-sensitively, so `aaa` and `AAA` are both stored there. `libraryLink` then inserts both into the global dict, but the second `dictAdd` silently returns `DICT_ERR` (return value ignored) because the global dict is case-insensitive. The load-path global-collision check iterates the per-library functions and looks each up in the global dict, but since `AAA` was never inserted there it finds no conflict and the load succeeds. 2. `FUNCTION DELETE` — `libraryUnlink` iterates the per-library dict (2 entries) and deletes each from the global dict. The first delete (`aaa`) succeeds; the second (`AAA`) resolves to the same case-insensitive slot, already removed, so `dictDelete` returns `DICT_ERR` and `serverAssert(ret == DICT_OK)` at `functions.c:332` aborts the server. `FUNCTION LOAD REPLACE` of an affected library hits the same path, since it calls `libraryUnlink` on the old library internally. Requires only the `SCRIPTING` ACL category, granted to users by default. No admin, cluster, or special config needed. ## Fix Make `libraryFunctionDictType` case-insensitive (`dictCStrCaseHash` + `dictSdsKeyCaseCompare`) to match the global dict. Names differing only in case now collide at registration, and `FUNCTION LOAD` is rejected cleanly with `Function already exists in the library`. This is consistent with the long-standing case-insensitive `FCALL` contract (see the existing `FUNCTION - test function case insensitive` test). ## Testing - Reproduced the crash on `unstable` before the fix; confirmed the server stays up and `FUNCTION LOAD` is rejected after. - Added two regression tests in `tests/unit/functions.tcl`. Verified they **fail on pre-fix code** (load wrongly succeeds, delete crashes the server) and **pass after the fix**. - `unit/functions`: 122 passed, 0 failed. `unit/scripting`: 683 passed, 0 failed. Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
…nf (valkey-io#3937) Minor cleanup, now it is the real option name. Signed-off-by: Binbin <binloveplay1314@qq.com>
Replace the separate rdb-compression-algo config with a single rdbcompression no|yes|lz4-stream. 'yes' keeps the legacy per-string LZF path; 'lz4-stream' selects whole-file streaming LZ4 RDB saves. Update valkey.conf and the RDB compression, check-rdb, and replication AOF sync tests to use the new config value. Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
--------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
The compression CI job (--config replcompression yes) ran replication-buffer.tcl and the dual-channel buffer-memory tests, which assert exact replication buffer/backlog memory and byte volumes. Compression legitimately changes those (per-replica codec buffers add ~1MB scratch; fewer wire bytes let replicas keep up), so the assertions fail under compression even though replication is correct. Drop the repl-compression tag from replication-buffer.tcl and the two dual-channel blocks holding the memory tests; they still run uncompressed in the regular job. Functional dual-channel coverage stays in the compression job. Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
…io#3897) ## Summary Fixes valkey-io#3008 > lets add assert checking that the object has a key in dbUntrackKeyWithVolatileItems and dbTrackKeyWithVolatileItems to be able to get a more explicit error in these cases This is addressed in this PR. Signed-off-by: ydsakshi <ydsakshi023@gmail.com>
clusterNode.shard_id is a fixed-size char[CLUSTER_NAMELEN] buffer that is not guaranteed to be NUL-terminated, so it must be printed with %.40s. This was introduced in valkey-io#2510. Signed-off-by: Binbin <binloveplay1314@qq.com>
…valkey-io#3941) Since valkey-io#2449 made the failover delay relative to cluster-node-timeout. Now delay = min(cluster-node-timeout / 30, 500), any cluster-node-timeout below 30, including the legal minimum 0 will collapses delay to zero, and `x % 0` is undefined behaviour. Signed-off-by: Binbin <binloveplay1314@qq.com>
…thakaggarwal97/valkey into replication-streaming-compression-pr # Conflicts: # src/compression_stream.c # src/compression_stream.h # src/config.c # src/rdb.c # src/server.h # src/unit/test_compression.cpp # tests/integration/rdb-compression.tcl # valkey.conf
…tate and plaintext passthrough Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
…val (fix macOS -Werror) Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
3155962 to
ef20044
Compare
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Owner
Author
|
Closing in favor of a fresh PR against the updated streaming-compression-rio-pr base (0f15410); GitHub had cached the old base diff. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds per-replica streaming compression for the incremental replication stream on top of valkey-io#3531, with
lz4as the first supported codec. Negotiated per-replica via the existing PSYNC handshake (REPLICA_CAPA_COMPRESSION); default off (replcompression no); existing replicas without the capability stay uncompressed.Compression runs inline on the IO thread that owns the replica's write job, with optional sticky thread affinity for cache locality on the long-lived LZ4 frame state. No dedicated compression threads, no IPC, no reordering.
Headline results (BlockMesh tweets, 3M keys × ~315 B):
Configs added:
replcompression(bool, defaultno): enables compression on a primary or replica.repl-compression-thread-affinity(bool, defaultyes): pins each compressed replica to a sticky IO thread.Related to valkey-io#3531. ZSTD support follows in valkey-io#3798.
Full description: design, threading model, configs, observability, benchmarks
Adds support for replication wire compression on top of valkey-io#3531.
lz4is the first supported codec for the incremental replication stream. The replication stream from primary to replica can now be wrapped in aVKCSenvelope (usingSTREAM_KIND_REPL) and compressed as a single long-lived frame at the per-replica buffer layer. Default behavior remains unchanged withreplcompression no. Existing replicas without the new capability continue to work uncompressed.The negotiation is per-replica via the existing PSYNC handshake; a new
REPLICA_CAPA_COMPRESSIONcapability lets each side opt in independently. Compression runs off the main thread via the existing IO thread infrastructure.Architecture
Threading Model
Primary side: compression runs on the IO thread that owns the replica's write job. There is no dedicated compression thread; the per-replica streamWriter state lives in
client->repl_dataand is touched only by the owning IO thread (or the main thread before any IO dispatch).Replica side: decompression runs inline on the main thread as part of the existing
readQueryFromClientpath. No worker threads, no submit/completion queues; the decompressed bytes feed directly intoprocessInputBufferwhich already lives on the main thread.The data-flow diagram above shows what gets compressed; this section shows where the work runs. Together they describe the full path from the shared replication backlog on the primary to the apply loop on the replica.
Key Design Decisions
Capability negotiation reuses the existing PSYNC handshake. Replicas that support compression advertise
capa compression; the primary recordsREPLICA_CAPA_COMPRESSIONon the per-replica bitmask and only compresses for replicas that opted in.One long-lived LZ4 frame per replica connection. Frame state was negotiated at handshake; replicas at different PSYNC offsets each see their own frame. Frame-done mid-stream is treated as a protocol violation and triggers disconnect.
Per-replica
streamWriteron the primary, singleton push-modestreamReaderon the replica. A replica only ever has one primary at a time; per-client decoder state is unnecessary on the replica side. Push-modestreamReaderis a new API surface added to feed the asynchronous replication read path.Compression dispatch reuses the existing IO thread infrastructure. Compressed replica writes flow through the same
JOB_REQ_WRITE_CLIENTpath as normal client writes. Optional thread affinity (config-toggleable) routes per-replica writes to a sticky owner thread for cache locality, with a self-organizing rebalance on scale-up.STREAM_KIND_REPL distinguishes replication frames from RDB frames. The replica's decoder rejects mismatched VKCS envelopes early. RDB
STREAM_KIND_RDBpayloads from Streaming Compression support for RDB valkey-io/valkey#3531 are unaffected.Replication-offset accounting stays in logical (uncompressed) bytes.
replDecompressQueryBufadjustsread_reploffby the raw→decoded delta so PSYNC offset invariants are preserved across compressed and uncompressed connections.LZ4 fast mode (level 0) chosen for max throughput. LZ4 fast can compress at ~5 GB/s on modern x86 vs ~500 MB/s for HC modes; for replication, throughput typically dominates because the network is the bottleneck before compression CPU is. Trade-off: ~10-15% worse compression ratio than HC modes.
Runtime
replcompression notriggersmarkCompressedReplicasForDisconnect. Existing compressed replicas get an async disconnect viafreeClientAsyncsince their LZ4 frames can't transition mid-link. Replicas reconnect and renegotiate plaintext if the primary stops compressing.Thread affinity
When
repl-compression-thread-affinityis on (default), each compressed replica sticks to a single IO thread that owns its long-lived LZ4 frame state. The design is dynamic in two ways:Lazy ownership. A compressed replica's
affinity_tidstarts unset (-1). The first IO thread that picks up its write job claims ownership:c->repl_data->affinity_tid = my_tid. From then on, every job for that replica is routed to that thread's private SPSC inbox (io_private_inbox[tid]). No upfront pinning at connect time, no main-thread bookkeeping at handshake, and no work for replicas that never accumulate enough traffic to need it.Event-driven rebalance on scale-up. When the IO thread pool grows under load (
active_io_threads_numincreases),replBalanceAffinity()runs and re-spreads existing compressed replicas across the wider pool. Without this, all replicas owned by the original few threads would stay clumped there, leaving new threads idle.If the owner's private inbox is full, the thread is sleeping, or the IO pool has shrunk past the owner's slot, the write falls through to the shared inbox. The owner counter (
debug_thread_switches) increments only on a real change of processing thread, so it surfaces both genuine rebalance events and graceful fallbacks for diagnostics.affinity_tidresets to-1on replica disconnect so a reconnect re-claims fresh ownership.The benefit is L1/L2 cache locality on the per-replica streamWriter struct and compressed_buf SDS. The Performance section quantifies this: thread switches drop by 4 to 5 orders of magnitude with no measured throughput cost when affinity is disabled, so operators with cache-pressure-sensitive workloads can leave it on, and operators who prefer a pure shared-inbox model can turn it off without throughput regression.
Configs
replcompressionnorepl-compression-thread-affinityyesno, all writes go through the shared inbox.Internal constants (not user-configurable in this PR; surfaced for reviewer context):
REPLICA_CAPA_COMPRESSION1 << 4capa compression.REPL_COMPRESSION_ALGOALGO_LZ4repl-compression-algoknob can be exposed once we have benchmarks comparing other codecs.REPL_COMPRESSION_LEVEL0REPL_COMPRESSION_BATCH_LIMIT1 MBcompressed_bufsize predictable.REPL_STREAM_DECODER_OUTPUT_MAX256 MBObservability
INFO replicationper-replica fields:compression=lz4,compressed_bytes,uncompressed_bytes,compression_ratio,compression_errorscompression_cpu_usec: primary-side CPU spent in compression for this replicadebug_compression_pending_drains: backpressure indicator (resume-pending path counter)debug_thread_switches: count of thread changes for IO offload (diagnostic)INFO replicationserver-level (replica side):repl_decompression_errors,repl_decompression_cpu_usec,repl_decompressed_bytes_totalrepl_apply_cpu_usec,repl_apply_batchesPerformance
Tested on
r7g.4xlarge(16 vCPU, ARM Graviton3) primary inus-east-1with 8 IO threads, 2 cross-region replicas inus-west-2, and 1 same-region uncompressed replica. Client:c7g.2xlarge(8 vCPU, same VPC), 30 connections, pipeline depth 50, COB limit 10.6 GB.Test 1: BlockMesh tweets: 3M keys × ~315 byte JSON values (1,073 MB uncompressed per replica).
Test 2: CNN DailyMail: 3M keys × ~4 KB news articles (~12,260 MB uncompressed per replica), 287K unique articles shuffled across 3M keys.
Compression effectiveness and primary-side cost (lower ratio and CPU are better)
BlockMesh tweets (~315 byte values, 1,073 MB uncompressed):
CNN DailyMail (~4 KB values, ~12,260 MB uncompressed):
Primary write throughput (SET keys/sec, BlockMesh)
Compression overhead vs uncompressed baseline is <1% across all algorithms at pipeline=50, and ≤3% at pipeline=10.
Replica-side decompression cost (BlockMesh, 1,073 MB decompressed)
Higher ZSTD levels can have lower decompression CPU because fewer bytes arrive on the wire (more compressed). LZ4 decompresses at ~1.25 GB/s; ZSTD-9 at ~617 MB/s.
Thread affinity (
repl-compression-thread-affinity)Tested ON vs OFF at LZ4 level 0, pipeline 50, 2 cross-region replicas:
Throughput is unchanged with or without affinity. The affinity benefit is dramatically fewer thread switches for the long-lived LZ4 frame state, 4 to 5 orders of magnitude on small-value workloads. This translates to better cache locality on the IO thread that owns the LZ4 stream, with no measured throughput cost when affinity is disabled. Default is ON.
Notes
Push-mode
streamReaderAPI (streamReaderCreatePush,streamReaderFeed,streamReaderFeedEnd,streamReaderNeedsInput,streamReaderFrameDone) was added on top of the pull-mode reader in Streaming Compression support for RDB valkey-io/valkey#3531. This is the API surface that lets event-loop callers feed bytes into the decoder asynchronously.Memory accounting via
streamWriterMemUsageincludes the per-replica streamWriter scratch buffer ingetClientOutputBufferMemoryUsage, soclient-output-buffer-limitcorrectly bounds compressed-replica memory.CI adds a
test-replication-compressionjob that runs the replication-tagged integration tests withreplcompression yesto exercise compression across the broader replication test surface.The Performance section includes ZSTD level 0/3/6/9 numbers for comparison against LZ4. This PR ships LZ4 only; ZSTD support for replication compression will land in #3798, and the per-algorithm default-level scaffolding in
src/server.h(REPL_COMPRESSION_DEFAULT_LEVEL_LZ4, with a commented-out..._ZSTDplaceholder) is set up so wiring ZSTD in is additive rather than a refactor.Related to valkey-io#3531.