Skip to content

[BUG] IO Threads: Write-heavy pipelines sometimes hang on response flush (Data is written, Client hangs) #3198

Description

@murphyjacob4

Describe the bug

It seems there is a race condition in Valkey 9.0 related to pipelined io-threads that causes clients to hang indefinitely when sending large pipelines of write commands. It did not seem to repro with GET so it might have to do with the small replies.

Symptoms:

The client sends a pipeline of commands (e.g., 100 SET ops).

The server successfully processes the commands (the data appears in the database).

The server fails to flush the full response back to the client.

The client hangs (TCP connection remains open) waiting for the remaining bytes of the response.

Eventually, the client times out.

To reproduce

Start Valkey 9.0 with IO threads enabled:

valkey-server --io-threads 4

I originally discovered when trying to ingest data via memtier_benchmark:

docker run --rm --network host redislabs/memtier_benchmark --data-size 1024 --threads 64 --clients 10 --pipeline 100 --requests=allkeys --key-maximum=1000000 --ratio 1:0 --key-pattern P:P --hide-histogram --server 127.0.0.1 --print-percentiles 50,95,99

The script hangs, but I noted that the keys in INFO were correct:

...
# Keyspace
db0:keys=1000001,expires=0,avg_ttl=0,keys_with_volatile_items=0

So it seems like it is just a problem with the response path.

Afterwards, I had AI make a simple script to reproduce:

Details

Script

package main

import (
	"bufio"
	"bytes"
	"flag"
	"fmt"
	"log"
	"net"
	"sync"
	"sync/atomic"
	"time"
)

var (
	target      = flag.String("server", "127.0.0.1:6379", "Valkey server address")
	concurrency = flag.Int("concurrency", 640, "Number of concurrent connections")
	pipeline    = flag.Int("pipeline", 100, "Pipeline size")
	payloadSize = flag.Int("size", 1024, "Data size in bytes")
	ops         uint64
)

func main() {
	flag.Parse()
	fmt.Printf("Starting WRITE-heavy repro against %s\n", *target)
	fmt.Printf("Concurrency: %d | Pipeline: %d | Payload: %d bytes\n", *concurrency, *pipeline, *payloadSize)
	fmt.Println("Workload: SET key value")

	var wg sync.WaitGroup
	wg.Add(*concurrency)

	// Start status reporter
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()
		for range ticker.C {
			count := atomic.SwapUint64(&ops, 0)
			fmt.Printf("OPS: %d\n", count)
		}
	}()

	// Launch Load
	for i := 0; i < *concurrency; i++ {
		go func(id int) {
			defer wg.Done()
			worker(id)
		}(i)
	}

	wg.Wait()
}

func worker(id int) {
	conn, err := net.DialTimeout("tcp", *target, 5*time.Second)
	if err != nil {
		log.Printf("Worker %d failed to connect: %v", id, err)
		return
	}
	defer conn.Close()

	if tcpConn, ok := conn.(*net.TCPConn); ok {
		tcpConn.SetNoDelay(true)
	}

	// 1. Pre-build the pipeline buffer for SET commands
	// We use a unique key per worker to avoid lock contention masking the IO bug
	// Key: repro:<id>
	key := fmt.Sprintf("repro:%d", id)
	value := bytes.Repeat([]byte("x"), *payloadSize)

	// RESP: *3\r\n$3\r\nSET\r\n$<klen>\r\n<key>\r\n$<vlen>\r\n<val>\r\n
	cmdHeader := fmt.Sprintf("*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$%d\r\n", len(key), key, len(value))
	// Combine header + body + CRLF
	fullCmd := append([]byte(cmdHeader), value...)
	fullCmd = append(fullCmd, []byte("\r\n")...)

	var buf bytes.Buffer
	for i := 0; i < *pipeline; i++ {
		buf.Write(fullCmd)
	}
	payload := buf.Bytes()

	reader := bufio.NewReader(conn)

	for {
		// Strict Deadline
		conn.SetDeadline(time.Now().Add(2 * time.Second))

		// Write the massive pipeline
		if _, err := conn.Write(payload); err != nil {
			log.Printf("Worker %d write failed: %v", id, err)
			return
		}

		// Read exactly 'pipeline' responses (Expecting +OK)
		for i := 0; i < *pipeline; i++ {
			line, err := reader.ReadBytes('\n')
			if err != nil {
				log.Printf("Worker %d HANG/ERROR on reply %d/%d: %v", id, i+1, *pipeline, err)
				return
			}

			// Verify we got "+OK"
			// (Trimming space to handle \r\n)
			if len(line) < 1 || line[0] != '+' {
				log.Printf("Worker %d unexpected response: %s", id, string(line))
				return
			}
		}
		atomic.AddUint64(&ops, uint64(*pipeline))
	}
}

Run with:

go run repro.go --server 127.0.0.1:6379 --concurrency 640 --pipeline 100

Observe that workers randomly hang while reading responses, reporting i/o timeout even though the server is up.

$ go run /tmp/repro.go --server 127.0.0.1:6379 --concurrency 640 --pipeline 100
Starting WRITE-heavy repro against 127.0.0.1:6379
Concurrency: 640 | Pipeline: 100 | Payload: 1024 bytes
Workload: SET key value
OPS: 860300
OPS: 1092100
OPS: 1105700
OPS: 1119900
2026/02/13 06:34:55 Worker 525 HANG/ERROR on reply 58/100: read tcp 127.0.0.1:40750->127.0.0.1:6379: i/o timeout
2026/02/13 06:34:55 Worker 563 HANG/ERROR on reply 58/100: read tcp 127.0.0.1:40412->127.0.0.1:6379: i/o timeout
2026/02/13 06:34:55 Worker 481 HANG/ERROR on reply 58/100: read tcp 127.0.0.1:44262->127.0.0.1:6379: i/o timeout
...

Expected behavior

Responses should be flushed.

Additional information

Environment:

Version: Valkey 9.0
OS: Linux

I tested on 8.0 and this didn't happen. On 8.1 I observed it once but much less frequent.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions