Skip to content

Memory usage may grow unexpectedly and causes OOM in some high-concurrency lock contention scenarios #17394

@MyonKeminta

Description

@MyonKeminta

Bug Report

We noticed that the memory of TiKV can grow quickly and can cause OOM in such scenario:

  • Many transactions concurrently try to lock a few keys, in which case a lock blocks many other transactions and the lock waiting queue can be very long.
  • The lock contention happens on a TiKV node where the leader of the deadlock detector is NOT on it, therefore each time there's an update to the lock-waiting relationships, there needs an RPC to notify the leader.

We are still investigating the root cause. Currently, we found the operation update_wait_for is very suspicious:

fn handle_update_wait_for(&mut self, events: Vec<UpdateWaitForEvent>) {
let mut wait_table = self.wait_table.borrow_mut();
let now = Instant::now();
for event in events {
let previous_wait_info = wait_table.update_waiter(&event, now);
if event.is_first_lock {
continue;
}
if let Some((previous_wait_info, diag_ctx)) = previous_wait_info {
self.detector_scheduler
.clean_up_wait_for(event.start_ts, previous_wait_info);
self.detector_scheduler
.detect(event.start_ts, event.wait_info, diag_ctx);
}
}
}

Considering the case that $N$ pessimistic lock requests is waiting in queue. At this time, the current lock from transaction A is released, and another transaction B acquired the lock. In this case, the $N$ lock waits for transaction A previously, and all become waiting for B suddenly. To notify the detector the change, $N$ clean_up_wait_for messages and $N$ detect messages will be sent to the detector leader. These requests are not batched. Batching them is not yet implemented as it needs changing the protocol and introduces complexity. However it seems it finally caused problem: If the lock waiting queue is long and lock is acquired and released very quickly, the total OPS of sending these requests can be very high, and finally exceed the sender's ability of sending requests, causing message accumulating in this channel:

pub fn detect(&self, req: DeadlockRequest) -> Result<()> {
self.sender
.as_ref()
.unwrap()
.unbounded_send(req)
.map_err(|e| Error::Other(box_err!(e)))
}
}

* We are still trying to confirm whether the above explanation is really the root cause. It's confirmed now


Steps to reproduce:

  1. Create a cluster with at least 2 TiKV nodes. It's ok to use tiup playground.
  2. Use either pd-ctl operator commands / placement rules to make the first region (start_key == "") and the region of tested table have their leader on different TiKV nodes.
    Example of how this can be done:
    1. (Assuming the two TiKV nodes have store_id 1 and 2 respectively):
      # Find the first region
      REGION_ID=$(tiup ctl:v7.5.2 pd region key "00" | jq ".id")
      
      # Ensure the first region's leader be on store 2
      tiup ctl:v7.5.2 pd operator add transfer-leader $REGION_ID 2
      
      # Setup zone labels to be used by placement rules
      tiup ctl:v7.5.2 pd store label 1 zone=z1
      tiup ctl:v7.5.2 pd store label 2 zone=z2
    2. Create a placement rule that makes all eader locates in zone z1 (store 1):
      create placement policy p1 leader_constraints="[+zone=z1]";
    3. Bind the placement rule p1 when creating table for testing.
  3. Run the following test program:
    package main
    
    import (
        "context"
        "database/sql"
        "encoding/hex"
        "flag"
        "fmt"
        "math/rand"
        "time"
    
        _ "github.com/go-sql-driver/mysql"
        "github.com/zyguan/sqlz"
        "golang.org/x/sync/errgroup"
    )
    
    var txnSize = flag.Int("txnSize", 2, "")
    var vLen = flag.Int("vlen", 32, "")
    var concurrency = flag.Int("concurrency", 200, "")
    var dsn = flag.String("dsn", "root:@tcp(127.0.0.1:4000)/test", "")
    
    func main() {
        flag.Parse()
    
        db, err := sql.Open("mysql", *dsn)
        if err != nil {
    	    panic(err)
        }
    
        sqlz.MustExec(db, "drop table if exists t")
        sqlz.MustExec(db, "create table t (id int primary key, v varchar(512), cnt int) placement policy=p1")
    
        genStmt := func() string {
    	    stmt := "insert into t values "
    	    for i := 0; i < *txnSize; i++ {
    		    v := make([]byte, *vLen/2)
    		    l, err := rand.Read(v)
    		    if err != nil || l != *vLen/2 {
    			    panic(fmt.Sprint("l:", l, "err:", err))
    		    }
    		    vh := hex.EncodeToString(v)
    		    if i != 0 {
    			    stmt += ", "
    		    }
    		    stmt += fmt.Sprintf("(%v, \"%s\", 1)", i+1, vh)
    	    }
    	    stmt += " on duplicate key update cnt = cnt + 1"
    	    return stmt
        }
    
        ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
        defer cancel()
    
        err = sqlz.WithConns(ctx, db, *concurrency, func(conns ...*sql.Conn) error {
    	    wg := new(errgroup.Group)
    	    stmt := genStmt()
    	    for _, conn := range conns {
    		    conn1 := conn
    		    wg.Go(func() error {
    			    for {
    				    tx, err := conn1.BeginTx(ctx, nil)
    				    if err != nil {
    					    return err
    				    }
    				    _, err = tx.ExecContext(ctx, stmt)
    				    if err != nil {
    					    return err
    				    }
    				    err = tx.Commit()
    				    if err != nil {
    					    return err
    				    }
    			    }
    		    })
    	    }
    
    	    return wg.Wait()
        })
        if err != nil {
    	    panic(err)
        }
    }

Metadata

Metadata

Assignees

Labels

affects-7.1This bug affects the 7.1.x(LTS) versions.affects-7.5This bug affects the 7.5.x(LTS) versions.affects-8.1This bug affects the 8.1.x(LTS) versions.affects-8.4affects-8.5This bug affects the 8.5.x(LTS) versions.impact/oomseverity/criticalsig/transactionSIG: Transactiontype/bugThe issue is confirmed as a bug.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions