Skip to content

Commit 7e2a074

Browse files
committed
inject dummy statements when vreplication transaction_timestamp is stale
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
1 parent 1428eea commit 7e2a074

2 files changed

Lines changed: 43 additions & 3 deletions

File tree

go/vt/vttablet/onlineddl/executor.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ type Executor struct {
140140
lastMigrationUUID string
141141
tickReentranceFlag int64
142142

143+
requestForDummyInjection int64
144+
143145
ticks *timer.Timer
144146
isOpen bool
145147
schemaInitialized bool
@@ -387,6 +389,24 @@ func (e *Executor) tableExists(ctx context.Context, tableName string) (bool, err
387389
return (row != nil), nil
388390
}
389391

392+
// injectDummyStatements issues several no-op statements on the backend MySQL server. This is done to
393+
// ensure binary logs are being written, to solve a possible scenario for vreplication-based migrations
394+
// where the server is otherwise completely stale. Vreplication does not write heartbeats like gh-ost does,
395+
// and if the server is stale (and assuming lag-throttler is not enabled) then binary logs may be completely silent,
396+
// which makes it impossible to know when to cut-over.
397+
func (e *Executor) injectDummyStatements(ctx context.Context) (err error) {
398+
conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB())
399+
if err != nil {
400+
return err
401+
}
402+
defer conn.Close()
403+
404+
if _, err := conn.ExecuteFetch(sqlDummyDropView, 0, false); err != nil {
405+
return err
406+
}
407+
return nil
408+
}
409+
390410
func (e *Executor) parseAlterOptions(ctx context.Context, onlineDDL *schema.OnlineDDL) string {
391411
// Temporary hack (2020-08-11)
392412
// Because sqlparser does not do full blown ALTER TABLE parsing,
@@ -1525,19 +1545,26 @@ func (e *Executor) isVReplMigrationReadyToCutOver(ctx context.Context, s *VReplS
15251545
}
15261546
return diff
15271547
}
1548+
timeNow := time.Now()
15281549
timeUpdated := time.Unix(s.timeUpdated, 0)
1529-
if durationDiff(time.Now(), timeUpdated) > cutOverThreshold {
1550+
if durationDiff(timeNow, timeUpdated) > cutOverThreshold {
15301551
return false, nil
15311552
}
15321553
// Let's look at transaction timestamp. This gets written by any ongoing
15331554
// writes on the server (whether on this table or any other table)
15341555
transactionTimestamp := time.Unix(s.transactionTimestamp, 0)
1535-
if durationDiff(transactionTimestamp, timeUpdated) > cutOverThreshold {
1556+
if diff := durationDiff(timeNow, transactionTimestamp); diff > cutOverThreshold {
15361557
// There's two ways the diff can be high:
15371558
// 1. High workload: vreplication is unable to apply events in a timely manner. This is
15381559
// a normal situation and is why we're testing transaction_timestamp
15391560
// 2. Lack of writes on the server. Possibly the server is stale. In which case, cut-over
15401561
// should be good to go. But we need (TODO) some mechanism to inform us that this is indeed the case.
1562+
1563+
// To handle (2), we request an injection of a dummy transaction, heuristically in good time for the next check.
1564+
// To not overdo it, and avoid spamming the server with dummy statements, we only request injection if lag is very high.
1565+
if diff > *migrationCheckInterval {
1566+
atomic.StoreInt64(&e.requestForDummyInjection, 1)
1567+
}
15411568
return false, nil
15421569
}
15431570
}
@@ -1793,6 +1820,18 @@ func (e *Executor) onMigrationCheckTick() {
17931820
log.Error(err)
17941821
return
17951822
}
1823+
1824+
if atomic.LoadInt64(&e.requestForDummyInjection) > 0 {
1825+
// This is a special scenario. VReplication-based online-DDL notices transaction timestamp is old. Either it
1826+
// is very busy, or the server is compeltely stale. In th elatter case, it requests dummy injection of
1827+
// statements. We comply here.
1828+
// We inject a few well-timed statements, scheduled to be fresh by *next iteration*, to be intercepted by e.reviewRunningMigrations(ctx), following
1829+
atomic.StoreInt64(&e.requestForDummyInjection, 0)
1830+
intervals := []time.Duration{*migrationCheckInterval - 2*time.Second, *migrationCheckInterval - time.Second, *migrationCheckInterval}
1831+
for _, d := range intervals {
1832+
time.AfterFunc(d, func() { e.injectDummyStatements(ctx) })
1833+
}
1834+
}
17961835
if err := e.retryTabletFailureMigrations(ctx); err != nil {
17971836
log.Error(err)
17981837
}

go/vt/vttablet/onlineddl/schema.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ const (
245245
_vt.copy_state
246246
WHERE vrepl_id=%a
247247
`
248-
sqlSwapTables = "RENAME TABLE `%a` TO `%a`, `%a` TO `%a`, `%a` TO `%a`"
248+
sqlSwapTables = "RENAME TABLE `%a` TO `%a`, `%a` TO `%a`, `%a` TO `%a`"
249+
sqlDummyDropView = "DROP VIEW IF EXISTS `_vt:onlineddl:executor:inject:binlog:entry`"
249250
)
250251

251252
const (

0 commit comments

Comments
 (0)