-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Description
Bug Report
We noticed that the memory of TiKV can grow quickly and can cause OOM in such scenario:
- Many transactions concurrently try to lock a few keys, in which case a lock blocks many other transactions and the lock waiting queue can be very long.
- The lock contention happens on a TiKV node where the leader of the deadlock detector is NOT on it, therefore each time there's an update to the lock-waiting relationships, there needs an RPC to notify the leader.
We are still investigating the root cause. Currently, we found the operation update_wait_for is very suspicious:
tikv/src/server/lock_manager/waiter_manager.rs
Lines 547 to 564 in 2577ecf
| fn handle_update_wait_for(&mut self, events: Vec<UpdateWaitForEvent>) { | |
| let mut wait_table = self.wait_table.borrow_mut(); | |
| let now = Instant::now(); | |
| for event in events { | |
| let previous_wait_info = wait_table.update_waiter(&event, now); | |
| if event.is_first_lock { | |
| continue; | |
| } | |
| if let Some((previous_wait_info, diag_ctx)) = previous_wait_info { | |
| self.detector_scheduler | |
| .clean_up_wait_for(event.start_ts, previous_wait_info); | |
| self.detector_scheduler | |
| .detect(event.start_ts, event.wait_info, diag_ctx); | |
| } | |
| } | |
| } |
Considering the case that clean_up_wait_for messages and detect messages will be sent to the detector leader. These requests are not batched. Batching them is not yet implemented as it needs changing the protocol and introduces complexity. However it seems it finally caused problem: If the lock waiting queue is long and lock is acquired and released very quickly, the total OPS of sending these requests can be very high, and finally exceed the sender's ability of sending requests, causing message accumulating in this channel:
tikv/src/server/lock_manager/client.rs
Lines 80 to 87 in 2577ecf
| pub fn detect(&self, req: DeadlockRequest) -> Result<()> { | |
| self.sender | |
| .as_ref() | |
| .unwrap() | |
| .unbounded_send(req) | |
| .map_err(|e| Error::Other(box_err!(e))) | |
| } | |
| } |
* We are still trying to confirm whether the above explanation is really the root cause. It's confirmed now
Steps to reproduce:
- Create a cluster with at least 2 TiKV nodes. It's ok to use tiup playground.
- Use either pd-ctl operator commands / placement rules to make the first region (start_key ==
"") and the region of tested table have their leader on different TiKV nodes.
Example of how this can be done:- (Assuming the two TiKV nodes have store_id 1 and 2 respectively):
# Find the first region REGION_ID=$(tiup ctl:v7.5.2 pd region key "00" | jq ".id") # Ensure the first region's leader be on store 2 tiup ctl:v7.5.2 pd operator add transfer-leader $REGION_ID 2 # Setup zone labels to be used by placement rules tiup ctl:v7.5.2 pd store label 1 zone=z1 tiup ctl:v7.5.2 pd store label 2 zone=z2
- Create a placement rule that makes all eader locates in zone z1 (store 1):
create placement policy p1 leader_constraints="[+zone=z1]";
- Bind the placement rule p1 when creating table for testing.
- (Assuming the two TiKV nodes have store_id 1 and 2 respectively):
- Run the following test program:
package main import ( "context" "database/sql" "encoding/hex" "flag" "fmt" "math/rand" "time" _ "github.com/go-sql-driver/mysql" "github.com/zyguan/sqlz" "golang.org/x/sync/errgroup" ) var txnSize = flag.Int("txnSize", 2, "") var vLen = flag.Int("vlen", 32, "") var concurrency = flag.Int("concurrency", 200, "") var dsn = flag.String("dsn", "root:@tcp(127.0.0.1:4000)/test", "") func main() { flag.Parse() db, err := sql.Open("mysql", *dsn) if err != nil { panic(err) } sqlz.MustExec(db, "drop table if exists t") sqlz.MustExec(db, "create table t (id int primary key, v varchar(512), cnt int) placement policy=p1") genStmt := func() string { stmt := "insert into t values " for i := 0; i < *txnSize; i++ { v := make([]byte, *vLen/2) l, err := rand.Read(v) if err != nil || l != *vLen/2 { panic(fmt.Sprint("l:", l, "err:", err)) } vh := hex.EncodeToString(v) if i != 0 { stmt += ", " } stmt += fmt.Sprintf("(%v, \"%s\", 1)", i+1, vh) } stmt += " on duplicate key update cnt = cnt + 1" return stmt } ctx, cancel := context.WithTimeout(context.Background(), time.Hour) defer cancel() err = sqlz.WithConns(ctx, db, *concurrency, func(conns ...*sql.Conn) error { wg := new(errgroup.Group) stmt := genStmt() for _, conn := range conns { conn1 := conn wg.Go(func() error { for { tx, err := conn1.BeginTx(ctx, nil) if err != nil { return err } _, err = tx.ExecContext(ctx, stmt) if err != nil { return err } err = tx.Commit() if err != nil { return err } } }) } return wg.Wait() }) if err != nil { panic(err) } }