Skip to content

Commit 261b06a

Browse files
committed
Treat isOpen as the ready/running signal.
Also align all initMutex usage. Signed-off-by: Matt Lord <mattalord@gmail.com>
1 parent 0de6a72 commit 261b06a

2 files changed

Lines changed: 32 additions & 25 deletions

File tree

go/test/endtoend/throttler/util.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,14 @@ func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabl
141141
return
142142
}
143143
}
144-
// If the tablet is PRIMARY Not Serving due to e.g. being involved
145-
// in a Reshard where its QueryService is explicitly disabled, then
144+
// If the tablet is Not Serving due to e.g. being involved in a
145+
// Reshard where its QueryService is explicitly disabled, then
146146
// we should not fail the test as the throttler will not be Open.
147147
tabletBody := getHTTPBody(tabletURL)
148148
class := strings.ToLower(gjson.Get(tabletBody, "0.Class").String())
149149
value := strings.ToLower(gjson.Get(tabletBody, "0.Value").String())
150150
if class == "unhappy" && strings.Contains(value, "primary: not serving") {
151-
log.Infof("tablet %s is PRIMARY Not Serving, so ignoring throttler status as the throttler will not be Opened", tablet.Alias)
151+
log.Infof("tablet %s is Not Serving, so ignoring throttler status as the throttler will not be Opened", tablet.Alias)
152152
return
153153
}
154154
select {

go/vt/vttablet/tabletserver/throttle/throttler.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,10 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv
222222
return throttler
223223
}
224224

225-
// CheckIsReady checks if this throttler is ready to serve. If not, it returns an error
225+
// CheckIsReady checks if this throttler is ready to serve. If not, it returns
226+
// an error.
226227
func (throttler *Throttler) CheckIsReady() error {
227-
if throttler.IsEnabled() {
228+
if throttler.IsOpen() {
228229
// all good
229230
return nil
230231
}
@@ -317,36 +318,37 @@ func (throttler *Throttler) normalizeThrottlerConfig(thottlerConfig *topodatapb.
317318
}
318319

319320
func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspace, err error) bool {
321+
throttler.initMutex.Lock()
322+
defer throttler.initMutex.Unlock()
320323
log.Infof("Throttler: WatchSrvKeyspaceCallback called with: %+v", srvks)
321324
if err != nil {
322325
log.Errorf("WatchSrvKeyspaceCallback error: %v", err)
323326
return false
324327
}
325328
throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig)
326329

327-
if throttler.IsEnabled() {
328-
// Throttler is running and we should apply the config change through Operate()
329-
// or else we get into race conditions.
330+
if throttler.IsOpen() {
331+
// Throttler is open/running and we should apply the config change
332+
// through Operate() or else we get into race conditions.
330333
go func() {
331334
log.Infof("Throttler: submitting a throttler config apply message with: %+v", throttlerConfig)
332335
throttler.throttlerConfigChan <- throttlerConfig
333336
}()
334337
} else {
335-
// throttler is not running, we should apply directly
338+
// Throttler is not open/running, we should apply directly.
336339
throttler.applyThrottlerConfig(context.Background(), throttlerConfig)
337340
}
338341

339342
return true
340343
}
341344

342-
// applyThrottlerConfig receives a Throttlerconfig as read from SrvKeyspace, and applies the configuration. This may cause
343-
// the throttler to be enabled/disabled, and of course it affects the throttling query/threshold.
345+
// applyThrottlerConfig receives a Throttlerconfig as read from SrvKeyspace, and applies the configuration.
346+
// This may cause the throttler to be enabled/disabled, and of course it affects the throttling query/threshold.
347+
// Note: you should be holding the initMutex when calling this function.
344348
func (throttler *Throttler) applyThrottlerConfig(ctx context.Context, throttlerConfig *topodatapb.ThrottlerConfig) {
345349
if !throttlerConfigViaTopo {
346350
return
347351
}
348-
throttler.initMutex.Lock()
349-
defer throttler.initMutex.Unlock()
350352
log.Infof("Throttler: applying topo config: %+v", throttlerConfig)
351353
if throttlerConfig.CustomQuery == "" {
352354
throttler.metricsQuery.Store(sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecardb.GetIdentifier()).Query)
@@ -366,6 +368,10 @@ func (throttler *Throttler) IsEnabled() bool {
366368
return atomic.LoadInt64(&throttler.isEnabled) > 0
367369
}
368370

371+
func (throttler *Throttler) IsOpen() bool {
372+
return atomic.LoadInt64(&throttler.isOpen) > 0
373+
}
374+
369375
// Enable activates the throttler probes; when enabled, the throttler responds to check queries based on
370376
// the collected metrics.
371377
func (throttler *Throttler) Enable(ctx context.Context) bool {
@@ -421,7 +427,7 @@ func (throttler *Throttler) Open() error {
421427
log.Infof("Throttler: started execution of Open. Acquiring initMutex lock")
422428
throttler.initMutex.Lock()
423429
defer throttler.initMutex.Unlock()
424-
if atomic.LoadInt64(&throttler.isOpen) > 0 {
430+
if throttler.IsOpen() {
425431
// already open
426432
log.Infof("Throttler: throttler is already open")
427433
return nil
@@ -453,15 +459,17 @@ func (throttler *Throttler) Open() error {
453459
retryTicker := time.NewTicker(retryInterval)
454460
defer retryTicker.Stop()
455461
for {
456-
if atomic.LoadInt64(&throttler.isOpen) == 0 {
457-
// closed down. No need to keep retrying
462+
if !throttler.IsOpen() {
463+
// Throttler is not open/running so no need to keep retrying.
458464
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting")
459465
return
460466
}
461467

462468
throttlerConfig, err := throttler.readThrottlerConfig(ctx)
463469
if err == nil {
464470
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig)
471+
throttler.initMutex.Lock()
472+
defer throttler.initMutex.Unlock()
465473
// It's possible that during a retry-sleep, the throttler is closed and opened again, leading
466474
// to two (or more) instances of this goroutine. That's not a big problem; it's fine if all
467475
// attempt to read the throttler config; but we just want to ensure they don't step on each other
@@ -490,7 +498,7 @@ func (throttler *Throttler) Close() {
490498
throttler.initMutex.Lock()
491499
log.Infof("Throttler: acquired initMutex lock")
492500
defer throttler.initMutex.Unlock()
493-
if atomic.LoadInt64(&throttler.isOpen) == 0 {
501+
if !throttler.IsOpen() {
494502
log.Infof("Throttler: throttler is not open")
495503
return
496504
}
@@ -577,7 +585,6 @@ func (throttler *Throttler) isDormant() bool {
577585
// Operate is the main entry point for the throttler operation and logic. It will
578586
// run the probes, collect metrics, refresh inventory, etc.
579587
func (throttler *Throttler) Operate(ctx context.Context) {
580-
581588
tickers := [](*timer.SuspendableTicker){}
582589
addTicker := func(d time.Duration) *timer.SuspendableTicker {
583590
t := timer.NewSuspendableTicker(d, false)
@@ -612,7 +619,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
612619

613620
// sparse
614621
shouldBeLeader := int64(0)
615-
if atomic.LoadInt64(&throttler.isOpen) > 0 {
622+
if throttler.IsOpen() {
616623
if throttler.tabletTypeFunc() == topodatapb.TabletType_PRIMARY {
617624
shouldBeLeader = 1
618625
}
@@ -638,7 +645,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
638645
}
639646
case <-mysqlCollectTicker.C:
640647
{
641-
if atomic.LoadInt64(&throttler.isOpen) > 0 {
648+
if throttler.IsOpen() {
642649
// frequent
643650
if !throttler.isDormant() {
644651
throttler.collectMySQLMetrics(ctx)
@@ -647,7 +654,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
647654
}
648655
case <-mysqlDormantCollectTicker.C:
649656
{
650-
if atomic.LoadInt64(&throttler.isOpen) > 0 {
657+
if throttler.IsOpen() {
651658
// infrequent
652659
if throttler.isDormant() {
653660
throttler.collectMySQLMetrics(ctx)
@@ -662,7 +669,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
662669
case <-mysqlRefreshTicker.C:
663670
{
664671
// sparse
665-
if atomic.LoadInt64(&throttler.isOpen) > 0 {
672+
if throttler.IsOpen() {
666673
go throttler.refreshMySQLInventory(ctx)
667674
}
668675
}
@@ -673,13 +680,13 @@ func (throttler *Throttler) Operate(ctx context.Context) {
673680
}
674681
case <-mysqlAggregateTicker.C:
675682
{
676-
if atomic.LoadInt64(&throttler.isOpen) > 0 {
683+
if throttler.IsOpen() {
677684
throttler.aggregateMySQLMetrics(ctx)
678685
}
679686
}
680687
case <-throttledAppsTicker.C:
681688
{
682-
if atomic.LoadInt64(&throttler.isOpen) > 0 {
689+
if throttler.IsOpen() {
683690
go throttler.expireThrottledApps()
684691
}
685692
}
@@ -1035,7 +1042,7 @@ func (throttler *Throttler) AppRequestMetricResult(ctx context.Context, appName
10351042

10361043
// checkStore checks the aggregated value of given MySQL store
10371044
func (throttler *Throttler) checkStore(ctx context.Context, appName string, storeName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) {
1038-
if !throttler.IsEnabled() {
1045+
if !throttler.IsOpen() || !throttler.IsEnabled() {
10391046
return okMetricCheckResult
10401047
}
10411048
if throttlerapp.ExemptFromChecks(appName) {

0 commit comments

Comments
 (0)