Skip to content

replication: add incremental stream compression transport#15

Closed
roshkhatri wants to merge 72 commits into
streaming-compression-rio-prfrom
replication-streaming-compression-pr
Closed

replication: add incremental stream compression transport#15
roshkhatri wants to merge 72 commits into
streaming-compression-rio-prfrom
replication-streaming-compression-pr

Conversation

@roshkhatri

@roshkhatri roshkhatri commented May 6, 2026

Copy link
Copy Markdown
Owner

Summary

Adds per-replica streaming compression for the incremental replication stream on top of valkey-io#3531, with lz4 as the first supported codec. Negotiated per-replica via the existing PSYNC handshake (REPLICA_CAPA_COMPRESSION); default off (replcompression no); existing replicas without the capability stay uncompressed.

Compression runs inline on the IO thread that owns the replica's write job, with optional sticky thread affinity for cache locality on the long-lived LZ4 frame state. No dedicated compression threads, no IPC, no reordering.

Headline results (BlockMesh tweets, 3M keys × ~315 B):

Bandwidth savings Compression CPU Throughput overhead
LZ4 level 0 (default) 52% 2.5s <1%
ZSTD level 9 (future, #3798) 75% 29.7s <3%

Configs added:

  • replcompression (bool, default no): enables compression on a primary or replica.
  • repl-compression-thread-affinity (bool, default yes): pins each compressed replica to a sticky IO thread.

Related to valkey-io#3531. ZSTD support follows in valkey-io#3798.

Full description: design, threading model, configs, observability, benchmarks

Adds support for replication wire compression on top of valkey-io#3531. lz4 is the first supported codec for the incremental replication stream. The replication stream from primary to replica can now be wrapped in a VKCS envelope (using STREAM_KIND_REPL) and compressed as a single long-lived frame at the per-replica buffer layer. Default behavior remains unchanged with replcompression no. Existing replicas without the new capability continue to work uncompressed.

The negotiation is per-replica via the existing PSYNC handshake; a new REPLICA_CAPA_COMPRESSION capability lets each side opt in independently. Compression runs off the main thread via the existing IO thread infrastructure.

Architecture

+-----------------------------+       +------------------------------+       +----------------------+
|      Primary side           |       |      Per-replica state       |       |       transport      |
|                             |       |                              |       |                      |
| addReplyToReplicaBuffer     |------>| streamWriter (LZ4 frame)     |------>| TCP socket → replica |
| writeToReplicaCompressed    |       | compressed_buf (sds)         |       |                      |
+-----------------------------+       +------------------------------+       +----------------------+

+-----------------------------+       +------------------------------+       +----------------------+
|      Replica side           |       |    Server-singleton state    |       |       transport      |
|                             |       |                              |       |                      |
| readQueryFromClient         |<------| streamReader (push-mode)     |<------| TCP socket ← primary |
| replDecompressQueryBuf      |       | repl_stream_decoder          |       |                      |
+-----------------------------+       +------------------------------+       +----------------------+

Threading Model

Primary side: compression runs on the IO thread that owns the replica's write job. There is no dedicated compression thread; the per-replica streamWriter state lives in client->repl_data and is touched only by the owning IO thread (or the main thread before any IO dispatch).

Primary
+--------------------------------------------------------------------------+
|                            MAIN PROCESS                                  |
|                                                                          |
|  Thread 0 (main)           IO Thread t1            IO Thread t2 ... tN   |
|    |                        |                      |                     |
|    | command execution      |                      |                     |
|    |   |                    |                      |                     |
|    |   v                    |                      |                     |
|    | shared repl backlog    |                      |                     |
|    | (uncompressed bytes)   |                      |                     |
|    |   |                    |                      |                     |
|    |   v                    |                      |                     |
|    | beforeSleep:           |                      |                     |
|    |  for each replica:     |                      |                     |
|    |   route write job ---->| private inbox        | private inbox       |
|    |   (sticky to owner)    |  (compressed         |                     |
|    |                        |   replica A,B)       |                     |
|    |                        |                      |                     |
|    |                        | writeToReplicaCompressed                   |
|    |                        |   step1: drain       |                     |
|    |                        |     leftover         |                     |
|    |                        |   step2: compress    |                     |
|    |                        |     (LZ4 frame)      |                     |
|    |                        |   step3: flush       |                     |
|    |                        |   step4: connWrite   |                     |
|    |                        |   ----- socket ----> to replica            |
|    |                        |                      |                     |
|    | postWriteToReplica:    |                      |                     |
|    |  <- completion --------|                      |                     |
|    |  if compressed_buf     |                      |                     |
|    |  fully sent:           |                      |                     |
|    |    advance backlog     |                      |                     |
|    |    cursor by raw bytes |                      |                     |
|    |                        |                      |                     |
+--------------------------------------------------------------------------+

Replica side: decompression runs inline on the main thread as part of the existing readQueryFromClient path. No worker threads, no submit/completion queues; the decompressed bytes feed directly into processInputBuffer which already lives on the main thread.

Replica
+--------------------------------------------------------------------------+
|                            MAIN PROCESS                                  |
|                                                                          |
|  Thread 0 (main)                                                         |
|    |                                                                     |
|    | readQueryFromClient (primary connection)                            |
|    |   |                                                                 |
|    |   v                                                                 |
|    | replDecompressQueryBuf                                              |
|    |   feed compressed bytes ----> streamReader (push mode)              |
|    |   pull decompressed RESP <----                                      |
|    |   adjust read_reploff by raw->decoded delta                         |
|    |   |                                                                 |
|    |   v                                                                 |
|    | processInputBuffer (apply replicated commands)                      |
|    |                                                                     |
+--------------------------------------------------------------------------+

The data-flow diagram above shows what gets compressed; this section shows where the work runs. Together they describe the full path from the shared replication backlog on the primary to the apply loop on the replica.

Key Design Decisions

  1. Capability negotiation reuses the existing PSYNC handshake. Replicas that support compression advertise capa compression; the primary records REPLICA_CAPA_COMPRESSION on the per-replica bitmask and only compresses for replicas that opted in.

  2. One long-lived LZ4 frame per replica connection. Frame state was negotiated at handshake; replicas at different PSYNC offsets each see their own frame. Frame-done mid-stream is treated as a protocol violation and triggers disconnect.

  3. Per-replica streamWriter on the primary, singleton push-mode streamReader on the replica. A replica only ever has one primary at a time; per-client decoder state is unnecessary on the replica side. Push-mode streamReader is a new API surface added to feed the asynchronous replication read path.

  4. Compression dispatch reuses the existing IO thread infrastructure. Compressed replica writes flow through the same JOB_REQ_WRITE_CLIENT path as normal client writes. Optional thread affinity (config-toggleable) routes per-replica writes to a sticky owner thread for cache locality, with a self-organizing rebalance on scale-up.

  5. STREAM_KIND_REPL distinguishes replication frames from RDB frames. The replica's decoder rejects mismatched VKCS envelopes early. RDB STREAM_KIND_RDB payloads from Streaming Compression support for RDB valkey-io/valkey#3531 are unaffected.

  6. Replication-offset accounting stays in logical (uncompressed) bytes. replDecompressQueryBuf adjusts read_reploff by the raw→decoded delta so PSYNC offset invariants are preserved across compressed and uncompressed connections.

  7. LZ4 fast mode (level 0) chosen for max throughput. LZ4 fast can compress at ~5 GB/s on modern x86 vs ~500 MB/s for HC modes; for replication, throughput typically dominates because the network is the bottleneck before compression CPU is. Trade-off: ~10-15% worse compression ratio than HC modes.

  8. Runtime replcompression no triggers markCompressedReplicasForDisconnect. Existing compressed replicas get an async disconnect via freeClientAsync since their LZ4 frames can't transition mid-link. Replicas reconnect and renegotiate plaintext if the primary stops compressing.

Thread affinity

When repl-compression-thread-affinity is on (default), each compressed replica sticks to a single IO thread that owns its long-lived LZ4 frame state. The design is dynamic in two ways:

  1. Lazy ownership. A compressed replica's affinity_tid starts unset (-1). The first IO thread that picks up its write job claims ownership: c->repl_data->affinity_tid = my_tid. From then on, every job for that replica is routed to that thread's private SPSC inbox (io_private_inbox[tid]). No upfront pinning at connect time, no main-thread bookkeeping at handshake, and no work for replicas that never accumulate enough traffic to need it.

  2. Event-driven rebalance on scale-up. When the IO thread pool grows under load (active_io_threads_num increases), replBalanceAffinity() runs and re-spreads existing compressed replicas across the wider pool. Without this, all replicas owned by the original few threads would stay clumped there, leaving new threads idle.

If the owner's private inbox is full, the thread is sleeping, or the IO pool has shrunk past the owner's slot, the write falls through to the shared inbox. The owner counter (debug_thread_switches) increments only on a real change of processing thread, so it surfaces both genuine rebalance events and graceful fallbacks for diagnostics. affinity_tid resets to -1 on replica disconnect so a reconnect re-claims fresh ownership.

The benefit is L1/L2 cache locality on the per-replica streamWriter struct and compressed_buf SDS. The Performance section quantifies this: thread switches drop by 4 to 5 orders of magnitude with no measured throughput cost when affinity is disabled, so operators with cache-pressure-sensitive workloads can leave it on, and operators who prefer a pure shared-inbox model can turn it off without throughput regression.

Configs

Config Default Description
replcompression no Enables replication compression on a primary or replica. Default is off.
repl-compression-thread-affinity yes Pins each compressed replica to a sticky IO thread for cache locality on the long-lived LZ4 frame state. When no, all writes go through the shared inbox.

Internal constants (not user-configurable in this PR; surfaced for reviewer context):

Constant Value Notes
REPLICA_CAPA_COMPRESSION 1 << 4 PSYNC capability bit advertised as capa compression.
REPL_COMPRESSION_ALGO ALGO_LZ4 Fixed for now; a repl-compression-algo knob can be exposed once we have benchmarks comparing other codecs.
REPL_COMPRESSION_LEVEL 0 LZ4 fast mode, chosen for replication throughput (see Performance section for the LZ4 vs ZSTD trade-offs).
REPL_COMPRESSION_BATCH_LIMIT 1 MB Per-dispatch raw-bytes cap; bounds worst-case batch latency and keeps compressed_buf size predictable.
REPL_STREAM_DECODER_OUTPUT_MAX 256 MB Replica-side decompressed-output cap; prevents unbounded buffering from a malformed stream.

Observability

INFO replication per-replica fields:

  • compression=lz4, compressed_bytes, uncompressed_bytes, compression_ratio, compression_errors
  • compression_cpu_usec: primary-side CPU spent in compression for this replica
  • debug_compression_pending_drains: backpressure indicator (resume-pending path counter)
  • debug_thread_switches: count of thread changes for IO offload (diagnostic)

INFO replication server-level (replica side):

  • repl_decompression_errors, repl_decompression_cpu_usec, repl_decompressed_bytes_total
  • repl_apply_cpu_usec, repl_apply_batches

Performance

Tested on r7g.4xlarge (16 vCPU, ARM Graviton3) primary in us-east-1 with 8 IO threads, 2 cross-region replicas in us-west-2, and 1 same-region uncompressed replica. Client: c7g.2xlarge (8 vCPU, same VPC), 30 connections, pipeline depth 50, COB limit 10.6 GB.

Test 1: BlockMesh tweets: 3M keys × ~315 byte JSON values (1,073 MB uncompressed per replica).
Test 2: CNN DailyMail: 3M keys × ~4 KB news articles (~12,260 MB uncompressed per replica), 287K unique articles shuffled across 3M keys.

Compression effectiveness and primary-side cost (lower ratio and CPU are better)

BlockMesh tweets (~315 byte values, 1,073 MB uncompressed):

Algorithm Ratio Wire bytes/replica Bandwidth savings Compression CPU/replica
LZ4 level 0 (default) 0.48 515 MB 52% 2.5s
ZSTD level 0 0.34 365 MB 66% n/a
ZSTD level 3 0.34 362 MB 67% 12.1s
ZSTD level 6 0.30 322 MB 70% 19.2s
ZSTD level 9 0.25 268 MB 75% 29.7s

CNN DailyMail (~4 KB values, ~12,260 MB uncompressed):

Algorithm Ratio Wire bytes/replica Bandwidth savings
LZ4 level 0 (default) 0.61 ~7,500 MB 39%
ZSTD level 0 0.37 ~4,500 MB 63%

Primary write throughput (SET keys/sec, BlockMesh)

Config Pipeline 10 Pipeline 20 Pipeline 50
No compression 105,002 111,493 117,315
LZ4 level 0 104,627 111,389 118,015
ZSTD level 3 103,403 112,433 118,163
ZSTD level 6 102,770 111,520 117,618
ZSTD level 9 102,422 111,641 113,948

Compression overhead vs uncompressed baseline is <1% across all algorithms at pipeline=50, and ≤3% at pipeline=10.

Replica-side decompression cost (BlockMesh, 1,073 MB decompressed)

Config Decompression CPU Apply CPU Total replica CPU Overhead vs uncompressed (3.64s)
No compression 0.12s 3.52s 3.64s baseline
LZ4 level 0 0.86s 3.46s 4.33s +19%
ZSTD level 0 2.62s 3.57s 6.19s +70%
ZSTD level 3 3.05s 3.80s 6.85s +88%
ZSTD level 6 2.51s 3.83s 6.34s +74%
ZSTD level 9 1.74s 3.36s 5.09s +40%

Higher ZSTD levels can have lower decompression CPU because fewer bytes arrive on the wire (more compressed). LZ4 decompresses at ~1.25 GB/s; ZSTD-9 at ~617 MB/s.

Thread affinity (repl-compression-thread-affinity)

Tested ON vs OFF at LZ4 level 0, pipeline 50, 2 cross-region replicas:

Dataset Affinity Throughput Thread switches per replica
BlockMesh (3M × 315 B) ON 118,651 keys/s 34, 2
BlockMesh (3M × 315 B) OFF 118,090 keys/s 795,025, 844,262
CNN DailyMail (3M × 4 KB) ON 84,113 keys/s 36, 4
CNN DailyMail (3M × 4 KB) OFF 84,018 keys/s 2,270, 2,734

Throughput is unchanged with or without affinity. The affinity benefit is dramatically fewer thread switches for the long-lived LZ4 frame state, 4 to 5 orders of magnitude on small-value workloads. This translates to better cache locality on the IO thread that owns the LZ4 stream, with no measured throughput cost when affinity is disabled. Default is ON.

Notes

  1. Push-mode streamReader API (streamReaderCreatePush, streamReaderFeed, streamReaderFeedEnd, streamReaderNeedsInput, streamReaderFrameDone) was added on top of the pull-mode reader in Streaming Compression support for RDB valkey-io/valkey#3531. This is the API surface that lets event-loop callers feed bytes into the decoder asynchronously.

  2. Memory accounting via streamWriterMemUsage includes the per-replica streamWriter scratch buffer in getClientOutputBufferMemoryUsage, so client-output-buffer-limit correctly bounds compressed-replica memory.

  3. CI adds a test-replication-compression job that runs the replication-tagged integration tests with replcompression yes to exercise compression across the broader replication test surface.

  4. The Performance section includes ZSTD level 0/3/6/9 numbers for comparison against LZ4. This PR ships LZ4 only; ZSTD support for replication compression will land in #3798, and the per-algorithm default-level scaffolding in src/server.h (REPL_COMPRESSION_DEFAULT_LEVEL_LZ4, with a commented-out ..._ZSTD placeholder) is set up so wiring ZSTD in is additive rather than a refactor.

Related to valkey-io#3531.

Comment thread valkey.conf Outdated
Comment thread valkey.conf Outdated
Comment thread src/compression.h Outdated
Comment thread tests/integration/repl-compression.tcl
@roshkhatri roshkhatri force-pushed the replication-streaming-compression-pr branch from 4a72480 to 0fc757a Compare May 8, 2026 20:18
@roshkhatri roshkhatri force-pushed the repl-compression-handshake-v2 branch from 8cd87d7 to 4dccdd4 Compare May 8, 2026 20:22
@roshkhatri roshkhatri force-pushed the replication-streaming-compression-pr branch 4 times, most recently from 0b91402 to 93a5ad0 Compare May 8, 2026 21:32
Comment thread src/server.c Outdated
"slave_read_only:%d\r\n", server.repl_replica_ro,
"replica_announced:%d\r\n", server.replica_announced));
if (server.repl_decompression_errors > 0) {
info = sdscatprintf(info, "repl_decompression_errors:%zu\r\n",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sdscatfmt

@roshkhatri roshkhatri force-pushed the replication-streaming-compression-pr branch 2 times, most recently from 522e226 to 3cd7c16 Compare May 13, 2026 16:17
roshkhatri added a commit that referenced this pull request May 13, 2026
sdscatfmt is faster than sdscatprintf — it skips printf-style format
parsing and uses its own lightweight specifier set. Matches the pattern
used elsewhere in INFO output for unsigned counters (%U + cast to
unsigned long long).

Addresses review comment on PR #15.
roshkhatri added a commit that referenced this pull request May 13, 2026
Adds a CI job that re-runs all replication-tagged integration tests
with replcompression=yes applied globally via ::global_overrides.
Exercises the streaming-compression transport across the full
replication test surface (replication, psync, dual-channel, buffer
management, AOF-sync, cross-version, block-repl, replica-redirect)
to catch regressions that only surface under compressed replication.

Per review feedback on PR #15.
enjoy-binbin and others added 13 commits May 15, 2026 11:55
…alkey-io#3703)

This test will insert keys, and as can be seen from the logs, the insertion
somehow is very slow in this CI, eventually causing a repl-timeout disconnection.
```
50655:M 14 May 2026 00:44:57.812 - DB 0: 5 keys (0 volatile) in 7 slots HT.
50655:M 14 May 2026 00:45:02.884 - DB 0: 6 keys (0 volatile) in 7 slots HT.
50655:M 14 May 2026 00:45:06.014 # Timing out slot migration xxx after not receiving ack for too long
```

Set repl-timeout for these tests to prevent the timeout disconnections.
Also the tests does not actually require inserting distinct keys, we only
need to fill the replication buffer, so there is no need for the different
keyname, this can save the CI some memory.

Closes valkey-io#3702.

Signed-off-by: Binbin <binloveplay1314@qq.com>
…alkey-io#3719)

## Summary

This PR addresses 2 race conditions where deferred `freeClient`
(introduced by valkey-io#3324) clobbers replication state set by `REPLICAOF`
commands.

Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI
failure in `tests/unit/wait.tcl`.

## Root Cause

Since PR valkey-io#3324 ("Redesign IO threading communication model"),
`freeClient()` on a primary client with pending IO is deferred via
`freeClientAsync` (gated on `clientHasPendingIO`). When the deferred
free eventually executes, it chains through `replicationCachePrimary()`
-> `replicationHandlePrimaryDisconnection()`, which unconditionally sets
`server.repl_state = REPL_STATE_CONNECT`.

This causes two bugs:

### Bug 1: REPLICAOF NO ONE (SIGSEGV)

`replicationUnsetPrimary()` sets `primary_host = NULL` before calling
`freeClient`. The deferred free runs later, sets `repl_state =
REPL_STATE_CONNECT` while `primary_host` is still NULL.
`replicationCron` then calls `connectWithPrimary()` which passes NULL to
`connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV.

### Bug 2: REPLICAOF newhost newport (connection leak)

`replicationSetPrimary()` calls `freeClient(old_primary)` (deferred),
then sets `primary_host` to the new IP and progresses `repl_state` to
`REPL_STATE_CONNECTING` with a new connection handle in
`server.repl_transfer_s`. The deferred free runs later, clobbers
`repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls
`connectWithPrimary()` again, overwriting `server.repl_transfer_s`
without closing the previous connection -- an FD leak.

## Fix

Make `replicationHandlePrimaryDisconnection()` only transition to
`REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED`
and `primary_host` is set. This means the disconnection is genuine and
no other state transition has already occurred. If `repl_state` has
already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is
stale and the function leaves the state untouched.

Additionally:
- `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in
depth).
- `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test,
and add dedicated tests for the repoint scenario.

## Reproduction

Reproduced locally by establishing TLS replication and executing
`REPLICAOF NO ONE`:
- Without fix: server crashes with signal 11, accessing address 0x0
- With fix: server continues operating normally

## Testing

- Full `unit/wait` test suite passes (51/51) with IO threads enabled
- New tests "Repoint replica between primaries does not leak connections
or crash" and "Rapid repoint does not crash or leak" pass
- Crash reproduced locally over TLS (SIGSEGV without fix, graceful
handling with fix)

---------

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
…gtest (valkey-io#3683)

release.c is compiled into valkey-cli, valkey-benchmark, and the
valkeylib-gtest static library, but only valkey-server declared a
dependency on the release_header custom target that runs mkreleasehdr.sh
to generate src/release.h. On a clean build (no stale release.h on disk)
or under high parallelism, those targets could attempt to compile
release.c before the header was generated and fail with "release.h: No
such file or directory".

Add the missing add_dependencies(... release_header) for all three.

Signed-off-by: Vadym Khoptynets <1099644+poiuj@users.noreply.github.com>
The hash-seed config is an sds string and may contain embedded NUL
bytes (sdssplitargs preserves \xNN escapes inside double quotes).

In the old code getHashSeedFromString() used strlen() on it, so two
hash-seed values differing only after a \x00 byte collapsed to the
same SipHash seed, can break it.

hash-seed was added in valkey-io#2608.

---------

Signed-off-by: Binbin <binloveplay1314@qq.com>
…alkey-io#3505)

When a replica receives a keyless command from a client that has
negotiated `CAPA REDIRECT` and has not sent `READONLY`, redirect it to
the primary with a `-REDIRECT` response.

Previously, keyless commands like `SCAN, DBSIZE, RANDOMKEY` always
executed locally on replicas, and keyless writes like FLUSHDB, FLUSHALL
returned -READONLY error with no way for clients to discover the
primary.

This reuses the existing `CAPA REDIRECT` capability. The capa signals
that the client can handle -REDIRECT responses — it does not prescribe
when redirects are sent. Clients that send READONLY opt into local
replica execution and are not redirected.

Only commands with CMD_READONLY or CMD_WRITE are redirected, so
connection/admin commands (PING, READONLY, CLIENT) remain unaffected.
EXEC with all-keyless queued commands (c->slot == -1) is treated as
keyless and the transaction is discarded.

---------

Signed-off-by: Yana Molodetsky <yamolodu@amazon.com>
Signed-off-by: Yana Molodetsky <59420437+yanamolo@users.noreply.github.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
…cations (valkey-io#3743)

When a module subscribes to NOTIFY_HASH keyspace events and blocks the
client in the notification callback, hexpireGenericCommand,
hgetdelCommand,
and hpersistCommand would trigger debugServerAssert in
notifyKeyspaceEvent()
because addReplyArrayLen() sets buffered_reply=1 before the notification
fires. This debug assert was added in valkey-io#1819.

Affected commands: HEXPIRE, HPEXPIRE, HEXPIREAT, HPEXPIREAT, HGETDEL,
HPERSIST.

---------

Signed-off-by: Binbin <binloveplay1314@qq.com>
…ey-io#3678)

Closes valkey-io#3663.

## Why

Three callers in `src/modules/lua/script_lua.c` lost error-code handling
during the valkey-io#2858 move of the Lua scripting engine into a module. On the
wire, `redis.pcall()` (no args), `redis.setresp(4)`, `pcall(redis.log,
1)`, `pcall(redis.log, 10, 'msg')`, and `pcall(redis.set_repl)` now
return the bare message instead of `ERR <message>`. OOM and WRONGTYPE
happened to keep working because their code letters survived elsewhere
by accident.

Reproducer:

```
$ ./src/valkey-cli eval "return redis.pcall()" 0
"Please specify at least one argument for this redis lib call"   # before this PR
"ERR Please specify at least one argument for this redis lib call"  # with this PR
```

## What changed

In `src/modules/lua/script_lua.c`:

1. `luaPushErrorBuff` — bare-message branch (case 2): prepend `ERR `
unless the message already starts with a code letter (defensive — five
existing callers hardcode `"ERR ..."` strings; double-prefixing them
would also be wrong).
2. `luaProcessReplyError` push_error branch: re-add the leading `-`
before delegating to `luaPushError` so the case-1 code-extraction path
handles `OOM`, `WRONGTYPE`, `READONLY`, etc.
3. `errorCallback` errno==0 branch: same re-add-`-` approach.

41 LoC across the three functions.

## Test

`tests/unit/scripting.tcl` regression at line 1028 covers:

- All five direct-API errors above — asserts `ERR ` prefix
- WRONGTYPE error path — asserts the code survives and is not
double-prefixed
- Runs 4 times under the `foreach is_eval × foreach
script_compatibility_api` matrix

Fails on `unstable`, passes with this PR.

---------

Signed-off-by: 1fanwang <1fannnw@gmail.com>
Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
…#3756)

This reverts the
[commit](valkey-io@fdd9039)
that was merged as part of the PR valkey-io#3544 due to a performance regression
observed [here](valkey-io#3750)

Signed-off-by: akash kumar <akumdev@amazon.com>
Update the data-size to match the automated perf benchmarking as 96 is
also embedded so moving to 128 to test for non-embedded.

I have also update the wfs to use the config files from the
`valkey-perf-benchmark` so we don't need to track the configs in 2
places, everything will be pulled from source `valkey-perf-benchmark`
and the configs will be standard throughout all runs

---------

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
…alkey-io#3766)

The HFE-blocking-keyspace-notify test added in valkey-io#3743 is racy and
intermittently fails with:

```
Expected '{event del key h1} {event hdel key h1}' to be equal to '{event hdel key h1}'
(context: ... cmd {assert_equal [lsort {{event hdel key h1} {event del key h1}}] [lsort [r b_keyspace.events]]} proc ::test)
```

## Why it races

When `HGETDEL` empties the hash, `hgetdelCommand` fires two
notifications back-to-back on the main thread:

1. `notifyKeyspaceEvent(NOTIFY_HASH, "hdel", ...)`
2. `notifyKeyspaceEvent(NOTIFY_GENERIC, "del", ...)` (the key was
removed)

The `block_keyspace_notification` test module's callback always spawns a
background thread that sleeps 1s, appends to the event log, and
optionally unblocks the client. For the second notification,
`RM_BlockClient` returns `NULL` (the client is already blocked by the
first), so the second thread never unblocks anyone — it only appends
`"del"` to the log.

The first thread is the one that unblocks the client, so `HGETDEL`
returns as soon as that thread has logged `"hdel"`. The test client then
immediately reads `b_keyspace.events`, racing the second thread which
has not yet acquired the event-log mutex. On a loaded host or under
valgrind the second thread routinely loses the race, leaving only
`"hdel"` visible.

## Fix

`wait_for_condition` until both events have been logged before asserting
on their contents. This matches the pattern already used by the four
other event-log assertions in this file (e.g. the `Server-created
keyspace notification` and `Event that fires twice` blocks).

```tcl
wait_for_condition 50 100 {
    [llength [r b_keyspace.events]] >= 2
} else {
    fail "Did not see both hdel and del events: [r b_keyspace.events]"
}
assert_equal [lsort {{event hdel key h1} {event del key h1}}] [lsort [r b_keyspace.events]]
```

Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Signed-off-by: Harkrishn Patro <bunty.hari@gmail.com>
…key-io#3800)

### Bug

When `ACL LOAD` deletes a user, it calls `freeClientOrCloseLater()` on
clients authenticated as that user. If `freeClient()` defers the actual
free (because the client has `flag.protected` set or has pending IO),
the client remains in `server.clients` with `c->user` still pointing to
the old user object.

After `raxFreeWithCallback(old_users, ACLFreeUserVoid)` frees all old
user objects, `c->user` becomes a dangling pointer. Any subsequent code
that dereferences `c->user` on the deferred client triggers a
heap-use-after-free.

### Crash sequence

1. Client A runs `ACL LOAD`
2. Client B is authenticated as a user that was removed from the ACL
file
3. Client B has `flag.protected` set (e.g. has pending IO (IO threads)
4. `ACLLoadFromFile` → `freeClientOrCloseLater(B, 0)` → `freeClient(B)`
→ defers to `freeClientAsync` because B is protected
5. `raxFreeWithCallback(old_users, ...)` frees all old user objects
including B's user
6. B is still in `server.clients` with `c->user` pointing to freed
memory
7. `CLIENT LIST` or any path that calls `catClientInfoString`
dereferences `c->user->name` → **UAF**

### Fix

Set `c->user` to default user before calling `freeClientOrCloseLater()`
in `ACLLoadFromFile`. The existing NULL guard in `catClientInfoString`
(`client->user ? client->user->name : "(superuser)"`) handles this
safely.

### Test

Added `DEBUG PROTECT-CLIENT <client-id>` subcommand that calls
`protectClient()` on the target, forcing `freeClient` to defer to async.
This enables deterministic reproduction without timing races.

The test:
1. Creates a user, connects a client as that user
2. Protects the client via `DEBUG PROTECT-CLIENT`
3. Removes the user from the ACL file and runs `ACL LOAD`
4. Runs `CLIENT LIST` which dereferences `c->user->name`

Without the fix, this crashes under ASAN with:
```
ERROR: AddressSanitizer: heap-use-after-free in catClientInfoString
READ of size 8
freed by: raxRecursiveFree → raxFreeWithCallback → ACLLoadFromFile
```

---------

Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
…alkey-io#3723)

1. Fix double-call to finishSlotMigrationJob in
clusterUpdateSlotImportsOnOwnershipChange(): when n == myself, the
function was called once with 'assigned to myself' message, then fell
through to call it again with 'no longer owned by source'. Added else
branch to make the two error paths mutually exclusive.

2. Fix RESP protocol violation in clusterCommandSyncSlotsFinish():
addReply(c, shared.ok) was sent unconditionally before validating the
job name and state, so on error the client would receive both +OK and
-ERR. Moved the OK reply after all validations pass.

Note that it is an internal command and the only clients sending it are
primary to replica connections and fake AOF clients. Both of those turn
off replies, so the reply doesn't actually get parsed and therefore the
violation has no effect. But good to clean it up

Signed-off-by: chx9 <lovelypiska@outlook.com>
sarthakaggarwal97 and others added 26 commits June 2, 2026 20:29
Collapse the save-notice if/else into one serverLog that always names the
active algorithm (none/lzf/lz4), instead of branching the message.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Replace em dashes in the compression comments with ASCII punctuation and
add a short note above streamWriterCreate describing the writer/reader
push/pull model and the finish-before-free requirement.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
…ey-io#3848)

Harden isValidAuxChar() to reject control characters (0x00-0x1F, 0x7F)
and format-significant delimiters (comma, equals, quotes, backslash,
space) that could corrupt `nodes.conf` parsing or enable injection of
crafted node entries on restart.

Additionally, add a validator for `cluster-announce-ip` which previously
had no input validation, allowing arbitrary bytes to be persisted into
the cluster config file.

---------

Signed-off-by: Eran Ifrah <eifrah@amazon.com>
…#3904)

## Problem

The shard ID restart tests in `cluster-shards.tcl` fail intermittently
under valgrind because `pause_process` (SIGSTOP) can hit before the
deferred config save is flushed to `nodes.conf`.

When a replica learns its primary's shard ID via gossip, it updates
`myself->shard_id` and sets `CLUSTER_TODO_SAVE_CONFIG` but the actual
write only happens in `clusterBeforeSleep()` during the next event loop
iteration. Under valgrind's slower execution, the test's SIGSTOP can
arrive before that flush, so on restart the node gets a fresh random
shard ID instead of the persisted one.

## Fix

Add `CLUSTER SAVECONFIG` before pausing nodes in both restart tests,
ensuring `nodes.conf` contains the current shard IDs on disk before
SIGSTOP. This follows the same pattern used in `latency-monitor.tcl`.

Fixes valkey-io#3883.

Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
This changes helps speed up the clang-format job. We don't really need
to upgrade every pre-installed package on the runner (firefox and a
bunch of other things the job never uses).

Before this change: 2m 47s
After this change: 31s

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
- Simplify the streaming APIs: caller-owned structs with Init/Free and
  descriptive self-parameter names; codec used only for the algorithm
  vtable.
- Store the compression algorithm id directly in the VKCS envelope and
  drop the separate codec enum plus its translation functions.
- Make the rio decorator opaque: track connection-backed streams with a
  flag instead of a transport-type enum.
- Route LZ4 contexts through zmalloc and assert on caller errors; keep
  runtime errors for corrupt or external input.
- Document the feed functions' streaming contract, mark the retriable
  capacity-shortage returns, and drop a redundant probe re-zero.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
valkey-check-rdb reported offsets via rioTell() on every read, and
rdbLoadProgressCallback computed the transport offset via rioTell() on
every read even when no progress event was due. Read processed_bytes
directly for the offset, and only compute the transport position when a
progress event is actually reported.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
…ey-io#3888)

Rework commandDbIdArgs so each helper returns the argv index of every dbid
argument it owns. ACLSelectorCheckCmd uses those positions to set keyidxptr
directly, replacing the cmd->proc-based offset table. The caller now reads
the dbid value via getLongLongFromObject(argv[positions[i]], ...). New
db=-checked commands only need to implement their own *DbIdArgs helper.

The fix is for COPY: the old chain fell through to 0, so the "object"
field in ACL LOG was always showed "copy", it now reports the first
denied dbid.

Based on valkey-io#3801, also see it for more details, DB ACL was added in valkey-io#2309.

Signed-off-by: Binbin <binloveplay1314@qq.com>
…lision (valkey-io#3925)

## Problem

The per-library function dict (`libraryFunctionDictType`) compares
function names **case-sensitively**, while the global function dict
(`functionDictType`) compares them **case-insensitively**
(`dictCStrCaseHash` + `dictSdsKeyCaseCompare`). This asymmetry lets a
single library register two functions whose names differ only in case,
leaving the two dicts inconsistent and crashing the server on the next
unlink.

### Reproducer (any client with the default `SCRIPTING` ACL category)

```
FUNCTION LOAD "#!lua name=poc
redis.register_function('aaa', function() return 1 end)
redis.register_function('AAA', function() return 2 end)"
FUNCTION DELETE poc
```

The server aborts with:

```
=== ASSERTION FAILED ===
==> functions.c:332 'ret == DICT_OK' is not true
0   valkey-server   ... libraryUnlink + ...
```

### Mechanism

1. `FUNCTION LOAD` — `functionLibCreateFunction` checks for duplicates
in the per-library dict case-sensitively, so `aaa` and `AAA` are both
stored there. `libraryLink` then inserts both into the global dict, but
the second `dictAdd` silently returns `DICT_ERR` (return value ignored)
because the global dict is case-insensitive. The load-path
global-collision check iterates the per-library functions and looks each
up in the global dict, but since `AAA` was never inserted there it finds
no conflict and the load succeeds.
2. `FUNCTION DELETE` — `libraryUnlink` iterates the per-library dict (2
entries) and deletes each from the global dict. The first delete (`aaa`)
succeeds; the second (`AAA`) resolves to the same case-insensitive slot,
already removed, so `dictDelete` returns `DICT_ERR` and
`serverAssert(ret == DICT_OK)` at `functions.c:332` aborts the server.

`FUNCTION LOAD REPLACE` of an affected library hits the same path, since
it calls `libraryUnlink` on the old library internally.

Requires only the `SCRIPTING` ACL category, granted to users by default.
No admin, cluster, or special config needed.

## Fix

Make `libraryFunctionDictType` case-insensitive (`dictCStrCaseHash` +
`dictSdsKeyCaseCompare`) to match the global dict. Names differing only
in case now collide at registration, and `FUNCTION LOAD` is rejected
cleanly with `Function already exists in the library`. This is
consistent with the long-standing case-insensitive `FCALL` contract (see
the existing `FUNCTION - test function case insensitive` test).

## Testing

- Reproduced the crash on `unstable` before the fix; confirmed the
server stays up and `FUNCTION LOAD` is rejected after.
- Added two regression tests in `tests/unit/functions.tcl`. Verified
they **fail on pre-fix code** (load wrongly succeeds, delete crashes the
server) and **pass after the fix**.
- `unit/functions`: 122 passed, 0 failed. `unit/scripting`: 683 passed,
0 failed.

Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
…nf (valkey-io#3937)

Minor cleanup, now it is the real option name.

Signed-off-by: Binbin <binloveplay1314@qq.com>
Replace the separate rdb-compression-algo config with a single
rdbcompression no|yes|lz4-stream. 'yes' keeps the legacy per-string LZF
path; 'lz4-stream' selects whole-file streaming LZ4 RDB saves.

Update valkey.conf and the RDB compression, check-rdb, and replication
AOF sync tests to use the new config value.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
The compression CI job (--config replcompression yes) ran replication-buffer.tcl
and the dual-channel buffer-memory tests, which assert exact replication
buffer/backlog memory and byte volumes. Compression legitimately changes those
(per-replica codec buffers add ~1MB scratch; fewer wire bytes let replicas keep
up), so the assertions fail under compression even though replication is
correct. Drop the repl-compression tag from replication-buffer.tcl and the two
dual-channel blocks holding the memory tests; they still run uncompressed in the
regular job. Functional dual-channel coverage stays in the compression job.

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
…io#3897)

## Summary

Fixes valkey-io#3008

> lets add assert checking that the object has a key in
dbUntrackKeyWithVolatileItems and dbTrackKeyWithVolatileItems to be able
to get a more explicit error in these cases

This is addressed in this PR.

Signed-off-by: ydsakshi <ydsakshi023@gmail.com>
clusterNode.shard_id is a fixed-size char[CLUSTER_NAMELEN] buffer
that is not guaranteed to be NUL-terminated, so it must be printed
with %.40s.

This was introduced in valkey-io#2510.

Signed-off-by: Binbin <binloveplay1314@qq.com>
…valkey-io#3941)

Since valkey-io#2449 made the failover delay relative to cluster-node-timeout.
Now delay = min(cluster-node-timeout / 30, 500), any cluster-node-timeout
below 30, including the legal minimum 0 will collapses delay to zero,
and `x % 0` is undefined behaviour.

Signed-off-by: Binbin <binloveplay1314@qq.com>
…thakaggarwal97/valkey into replication-streaming-compression-pr

# Conflicts:
#	src/compression_stream.c
#	src/compression_stream.h
#	src/config.c
#	src/rdb.c
#	src/server.h
#	src/unit/test_compression.cpp
#	tests/integration/rdb-compression.tcl
#	valkey.conf
…tate and plaintext passthrough

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
…val (fix macOS -Werror)

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
@roshkhatri roshkhatri force-pushed the replication-streaming-compression-pr branch from 3155962 to ef20044 Compare June 10, 2026 03:53
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
@roshkhatri

Copy link
Copy Markdown
Owner Author

Closing in favor of a fresh PR against the updated streaming-compression-rio-pr base (0f15410); GitHub had cached the old base diff.

@roshkhatri roshkhatri closed this Jun 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.