File tree Expand file tree Collapse file tree
heartbeat/scheduler/timerqueue Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -88,7 +88,7 @@ func (tq *TimerQueue) Start() {
8888 if tq .th .Len () > 0 {
8989 nr := tq .th [0 ].runAt
9090 tq .nextRunAt = & nr
91- tq .timer . Reset ( nr . Sub ( time .Now () ))
91+ tq .timer = time . NewTimer ( time .Until ( nr ))
9292 } else {
9393 tq .timer .Stop ()
9494 tq .nextRunAt = nil
@@ -104,14 +104,21 @@ func (tq *TimerQueue) pushInternal(tt *timerTask) {
104104 heap .Push (& tq .th , tt )
105105
106106 if tq .nextRunAt == nil || tq .nextRunAt .After (tt .runAt ) {
107- // Stop and drain the timer prior to reset per https://golang.org/pkg/time/#Timer.Reset
108- // Only drain if nextRunAt is set, otherwise the timer channel has already been stopped the
109- // channel is empty (and thus would block)
110107 if tq .nextRunAt != nil && ! tq .timer .Stop () {
111108 <- tq .timer .C
112109 }
113- tq .timer .Reset (tt .runAt .Sub (time .Now ()))
114-
110+ // Originally the line below this comment was
111+ //
112+ // tq.timer.Reset(time.Until(tt.runAt))
113+ //
114+ // however this broke in go1.16rc1, specifically on the commit b4b014465216790e01aa66f9120d03230e4aff46
115+ //, specifically on this line:
116+ // https://github.com/golang/go/commit/b4b014465216790e01aa66f9120d03230e4aff46#diff-73699b6edfe5dbb3f6824e66bb3566bce9405e9a8c810cac55c8199459f0ac19R652
117+ // where some nice new optimizations don't actually work reliably
118+ // This can be worked around by instantiating a new timer rather than resetting the timer.
119+ // since that internally calls deltimer in runtime/timer.go rather than modtimer,
120+ // I suspect that the problem is in modtimer's setting of &pp.timerModifiedEarliest
121+ tq .timer = time .NewTimer (time .Until (tt .runAt ))
115122 tq .nextRunAt = & tt .runAt
116123 }
117124}
Original file line number Diff line number Diff line change @@ -20,18 +20,41 @@ package timerqueue
2020import (
2121 "context"
2222 "math/rand"
23+ "os"
24+ "runtime/pprof"
2325 "sort"
2426 "testing"
2527 "time"
2628
2729 "github.com/stretchr/testify/require"
2830)
2931
30- func TestQueueRunsInOrder (t * testing.T ) {
31- t .Skip ("flaky test on windows: https://github.com/elastic/beats/issues/26205" )
32- // Bugs can show up only occasionally
33- for i := 0 ; i < 100 ; i ++ {
34- testQueueRunsInOrderOnce (t )
32+ func TestRunsInOrder (t * testing.T ) {
33+ testQueueRunsInOrderOnce (t )
34+ }
35+
36+ // TestStress tries to figure out if we have any deadlocks that show up under concurrency
37+ func TestStress (t * testing.T ) {
38+ for i := 0 ; i < 120000 ; i ++ {
39+ failed := make (chan bool )
40+ succeeded := make (chan bool )
41+
42+ watchdog := time .AfterFunc (time .Second * 5 , func () {
43+ failed <- true
44+ })
45+
46+ go func () {
47+ testQueueRunsInOrderOnce (t )
48+ succeeded <- true
49+ }()
50+
51+ select {
52+ case <- failed :
53+ pprof .Lookup ("goroutine" ).WriteTo (os .Stdout , 1 )
54+ require .FailNow (t , "Scheduler test iteration timed out, deadlock issue?" )
55+ case <- succeeded :
56+ watchdog .Stop ()
57+ }
3558 }
3659}
3760
You can’t perform that action at this time.
0 commit comments