@@ -222,9 +222,10 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv
222222 return throttler
223223}
224224
225- // CheckIsReady checks if this throttler is ready to serve. If not, it returns an error
225+ // CheckIsReady checks if this throttler is ready to serve. If not, it returns
226+ // an error.
226227func (throttler * Throttler ) CheckIsReady () error {
227- if throttler .IsEnabled () {
228+ if throttler .IsOpen () {
228229 // all good
229230 return nil
230231 }
@@ -317,36 +318,37 @@ func (throttler *Throttler) normalizeThrottlerConfig(thottlerConfig *topodatapb.
317318}
318319
319320func (throttler * Throttler ) WatchSrvKeyspaceCallback (srvks * topodatapb.SrvKeyspace , err error ) bool {
321+ throttler .initMutex .Lock ()
322+ defer throttler .initMutex .Unlock ()
320323 log .Infof ("Throttler: WatchSrvKeyspaceCallback called with: %+v" , srvks )
321324 if err != nil {
322325 log .Errorf ("WatchSrvKeyspaceCallback error: %v" , err )
323326 return false
324327 }
325328 throttlerConfig := throttler .normalizeThrottlerConfig (srvks .ThrottlerConfig )
326329
327- if throttler .IsEnabled () {
328- // Throttler is running and we should apply the config change through Operate()
329- // or else we get into race conditions.
330+ if throttler .IsOpen () {
331+ // Throttler is open/ running and we should apply the config change
332+ // through Operate() or else we get into race conditions.
330333 go func () {
331334 log .Infof ("Throttler: submitting a throttler config apply message with: %+v" , throttlerConfig )
332335 throttler .throttlerConfigChan <- throttlerConfig
333336 }()
334337 } else {
335- // throttler is not running, we should apply directly
338+ // Throttler is not open/ running, we should apply directly.
336339 throttler .applyThrottlerConfig (context .Background (), throttlerConfig )
337340 }
338341
339342 return true
340343}
341344
342- // applyThrottlerConfig receives a Throttlerconfig as read from SrvKeyspace, and applies the configuration. This may cause
343- // the throttler to be enabled/disabled, and of course it affects the throttling query/threshold.
345+ // applyThrottlerConfig receives a Throttlerconfig as read from SrvKeyspace, and applies the configuration.
346+ // This may cause the throttler to be enabled/disabled, and of course it affects the throttling query/threshold.
347+ // Note: you should be holding the initMutex when calling this function.
344348func (throttler * Throttler ) applyThrottlerConfig (ctx context.Context , throttlerConfig * topodatapb.ThrottlerConfig ) {
345349 if ! throttlerConfigViaTopo {
346350 return
347351 }
348- throttler .initMutex .Lock ()
349- defer throttler .initMutex .Unlock ()
350352 log .Infof ("Throttler: applying topo config: %+v" , throttlerConfig )
351353 if throttlerConfig .CustomQuery == "" {
352354 throttler .metricsQuery .Store (sqlparser .BuildParsedQuery (defaultReplicationLagQuery , sidecardb .GetIdentifier ()).Query )
@@ -366,6 +368,10 @@ func (throttler *Throttler) IsEnabled() bool {
366368 return atomic .LoadInt64 (& throttler .isEnabled ) > 0
367369}
368370
371+ func (throttler * Throttler ) IsOpen () bool {
372+ return atomic .LoadInt64 (& throttler .isOpen ) > 0
373+ }
374+
369375// Enable activates the throttler probes; when enabled, the throttler responds to check queries based on
370376// the collected metrics.
371377func (throttler * Throttler ) Enable (ctx context.Context ) bool {
@@ -421,7 +427,7 @@ func (throttler *Throttler) Open() error {
421427 log .Infof ("Throttler: started execution of Open. Acquiring initMutex lock" )
422428 throttler .initMutex .Lock ()
423429 defer throttler .initMutex .Unlock ()
424- if atomic . LoadInt64 ( & throttler .isOpen ) > 0 {
430+ if throttler .IsOpen () {
425431 // already open
426432 log .Infof ("Throttler: throttler is already open" )
427433 return nil
@@ -453,15 +459,17 @@ func (throttler *Throttler) Open() error {
453459 retryTicker := time .NewTicker (retryInterval )
454460 defer retryTicker .Stop ()
455461 for {
456- if atomic . LoadInt64 ( & throttler .isOpen ) == 0 {
457- // closed down. No need to keep retrying
462+ if ! throttler .IsOpen () {
463+ // Throttler is not open/running so no need to keep retrying.
458464 log .Errorf ("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting" )
459465 return
460466 }
461467
462468 throttlerConfig , err := throttler .readThrottlerConfig (ctx )
463469 if err == nil {
464470 log .Errorf ("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v" , throttlerConfig )
471+ throttler .initMutex .Lock ()
472+ defer throttler .initMutex .Unlock ()
465473 // It's possible that during a retry-sleep, the throttler is closed and opened again, leading
466474 // to two (or more) instances of this goroutine. That's not a big problem; it's fine if all
467475 // attempt to read the throttler config; but we just want to ensure they don't step on each other
@@ -490,7 +498,7 @@ func (throttler *Throttler) Close() {
490498 throttler .initMutex .Lock ()
491499 log .Infof ("Throttler: acquired initMutex lock" )
492500 defer throttler .initMutex .Unlock ()
493- if atomic . LoadInt64 ( & throttler .isOpen ) == 0 {
501+ if ! throttler .IsOpen () {
494502 log .Infof ("Throttler: throttler is not open" )
495503 return
496504 }
@@ -577,7 +585,6 @@ func (throttler *Throttler) isDormant() bool {
577585// Operate is the main entry point for the throttler operation and logic. It will
578586// run the probes, collect metrics, refresh inventory, etc.
579587func (throttler * Throttler ) Operate (ctx context.Context ) {
580-
581588 tickers := [](* timer.SuspendableTicker ){}
582589 addTicker := func (d time.Duration ) * timer.SuspendableTicker {
583590 t := timer .NewSuspendableTicker (d , false )
@@ -612,7 +619,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
612619
613620 // sparse
614621 shouldBeLeader := int64 (0 )
615- if atomic . LoadInt64 ( & throttler .isOpen ) > 0 {
622+ if throttler .IsOpen () {
616623 if throttler .tabletTypeFunc () == topodatapb .TabletType_PRIMARY {
617624 shouldBeLeader = 1
618625 }
@@ -638,7 +645,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
638645 }
639646 case <- mysqlCollectTicker .C :
640647 {
641- if atomic . LoadInt64 ( & throttler .isOpen ) > 0 {
648+ if throttler .IsOpen () {
642649 // frequent
643650 if ! throttler .isDormant () {
644651 throttler .collectMySQLMetrics (ctx )
@@ -647,7 +654,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
647654 }
648655 case <- mysqlDormantCollectTicker .C :
649656 {
650- if atomic . LoadInt64 ( & throttler .isOpen ) > 0 {
657+ if throttler .IsOpen () {
651658 // infrequent
652659 if throttler .isDormant () {
653660 throttler .collectMySQLMetrics (ctx )
@@ -662,7 +669,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
662669 case <- mysqlRefreshTicker .C :
663670 {
664671 // sparse
665- if atomic . LoadInt64 ( & throttler .isOpen ) > 0 {
672+ if throttler .IsOpen () {
666673 go throttler .refreshMySQLInventory (ctx )
667674 }
668675 }
@@ -673,13 +680,13 @@ func (throttler *Throttler) Operate(ctx context.Context) {
673680 }
674681 case <- mysqlAggregateTicker .C :
675682 {
676- if atomic . LoadInt64 ( & throttler .isOpen ) > 0 {
683+ if throttler .IsOpen () {
677684 throttler .aggregateMySQLMetrics (ctx )
678685 }
679686 }
680687 case <- throttledAppsTicker .C :
681688 {
682- if atomic . LoadInt64 ( & throttler .isOpen ) > 0 {
689+ if throttler .IsOpen () {
683690 go throttler .expireThrottledApps ()
684691 }
685692 }
@@ -1035,7 +1042,7 @@ func (throttler *Throttler) AppRequestMetricResult(ctx context.Context, appName
10351042
10361043// checkStore checks the aggregated value of given MySQL store
10371044func (throttler * Throttler ) checkStore (ctx context.Context , appName string , storeName string , remoteAddr string , flags * CheckFlags ) (checkResult * CheckResult ) {
1038- if ! throttler .IsEnabled () {
1045+ if ! throttler .IsOpen () || ! throttler . IsEnabled () {
10391046 return okMetricCheckResult
10401047 }
10411048 if throttlerapp .ExemptFromChecks (appName ) {
0 commit comments