Skip to content

Commit 38456f7

Browse files
committed
feat(beacon_chain): add stop mechanism to EthereumBeaconChain
test(beacon_chain): add tests for concurrent stop and nil wallclock scenarios Adds a `Stop` method to the `EthereumBeaconChain` to allow for graceful shutdown of the background goroutines. This prevents goroutine leaks when the beacon chain is no longer needed. Adds tests to ensure that calling `Stop` concurrently with callback registration does not cause a race condition or panic. Adds a test to specifically address a production issue where calling `OnEpochChanged` on a nil `EthereumBeaconChain` receiver caused a panic. This test simulates the scenario where the wallclock becomes nil and verifies that the expected panic occurs, highlighting the need for nil checks before calling methods on the wallclock.
1 parent 37b96a3 commit 38456f7

2 files changed

Lines changed: 140 additions & 4 deletions

File tree

beacon_chain.go

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type EthereumBeaconChain struct {
1515

1616
slotCh chan struct{}
1717
epochCh chan struct{}
18+
stopCh chan struct{}
19+
stopped bool
1820
}
1921

2022
func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, slotsPerEpoch uint64) *EthereumBeaconChain {
@@ -27,13 +29,17 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl
2729

2830
slotCh: make(chan struct{}),
2931
epochCh: make(chan struct{}),
32+
stopCh: make(chan struct{}),
33+
stopped: false,
3034
}
3135

3236
go func() {
3337
for {
3438
select {
3539
case <-e.slotCh:
3640
return
41+
case <-e.stopCh:
42+
return
3743
default:
3844
slot := e.slots.Current()
3945

@@ -43,6 +49,13 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl
4349

4450
// Take a read lock and copy the callbacks.
4551
e.mu.RLock()
52+
53+
if e.stopped {
54+
e.mu.RUnlock()
55+
56+
return
57+
}
58+
4659
callbacks := make([]func(current Slot), len(e.slotChangedCallbacks))
4760
copy(callbacks, e.slotChangedCallbacks)
4861
e.mu.RUnlock()
@@ -60,6 +73,8 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl
6073
select {
6174
case <-e.epochCh:
6275
return
76+
case <-e.stopCh:
77+
return
6378
default:
6479
epoch := e.epochs.Current()
6580

@@ -69,6 +84,13 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl
6984

7085
// Take a read lock and copy the callbacks.
7186
e.mu.RLock()
87+
88+
if e.stopped {
89+
e.mu.RUnlock()
90+
91+
return
92+
}
93+
7294
callbacks := make([]func(current Epoch), len(e.epochChangedCallbacks))
7395
copy(callbacks, e.epochChangedCallbacks)
7496
e.mu.RUnlock()
@@ -108,19 +130,56 @@ func (e *EthereumBeaconChain) Epochs() *DefaultEpochCreator {
108130

109131
func (e *EthereumBeaconChain) OnEpochChanged(callback func(current Epoch)) {
110132
e.mu.Lock()
133+
defer e.mu.Unlock()
134+
135+
if e.stopped {
136+
return
137+
}
138+
111139
e.epochChangedCallbacks = append(e.epochChangedCallbacks, callback)
112-
e.mu.Unlock()
113140
}
114141

115142
func (e *EthereumBeaconChain) OnSlotChanged(callback func(current Slot)) {
116143
e.mu.Lock()
144+
defer e.mu.Unlock()
145+
146+
if e.stopped {
147+
return
148+
}
149+
117150
e.slotChangedCallbacks = append(e.slotChangedCallbacks, callback)
118-
e.mu.Unlock()
119151
}
120152

121153
func (e *EthereumBeaconChain) Stop() {
122-
e.slotCh <- struct{}{}
123-
e.epochCh <- struct{}{}
154+
e.mu.Lock()
155+
156+
if e.stopped {
157+
e.mu.Unlock()
158+
159+
return
160+
}
161+
162+
e.stopped = true
163+
e.mu.Unlock()
164+
165+
close(e.stopCh)
166+
167+
// Send a signal to the other channels, but don't close them yet
168+
// to avoid "send on closed channel" panics from any other goroutines.
169+
select {
170+
case e.slotCh <- struct{}{}:
171+
default:
172+
}
173+
174+
select {
175+
case e.epochCh <- struct{}{}:
176+
default:
177+
}
178+
179+
// Small delay to allow goroutines to exit
180+
time.Sleep(100 * time.Millisecond)
181+
182+
// Now safe to close
124183
close(e.slotCh)
125184
close(e.epochCh)
126185
}

beacon_chain_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,16 @@ import (
66
"time"
77
)
88

9+
// MetadataService mocks the metadata service we use across xatu.
10+
type MetadataService struct {
11+
wallclock *EthereumBeaconChain
12+
}
13+
14+
// Wallclock returns the wallclock instance (can be nil).
15+
func (m *MetadataService) Wallclock() *EthereumBeaconChain {
16+
return m.wallclock
17+
}
18+
919
func TestBeaconChainEventCallbacks(t *testing.T) {
1020
beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2)
1121

@@ -46,3 +56,70 @@ func TestBeaconChainEventCallbacks(t *testing.T) {
4656

4757
beacon.Stop()
4858
}
59+
60+
// TestConcurrentStopAndCallback tests that there's no race condition
61+
// between stopping the beacon chain and registering/executing callbacks.
62+
func TestConcurrentStopAndCallback(t *testing.T) {
63+
beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2)
64+
65+
// Set up a sync WaitGroup to coordinate goroutines.
66+
var wg sync.WaitGroup
67+
68+
// Start multiple goroutines that try to register callbacks.
69+
for i := 0; i < 10; i++ {
70+
wg.Add(1)
71+
go func(id int) {
72+
defer wg.Done()
73+
// Try to register a callback - should not panic even if Stop is called concurrently.
74+
beacon.OnEpochChanged(func(epoch Epoch) {})
75+
}(i)
76+
}
77+
78+
// Start a goroutine that stops the beacon chain.
79+
wg.Add(1)
80+
go func() {
81+
defer wg.Done()
82+
// Small delay to increase chance of concurrent execution
83+
time.Sleep(5 * time.Millisecond)
84+
beacon.Stop()
85+
}()
86+
87+
// Wait for all goroutines to finish.
88+
wg.Wait()
89+
}
90+
91+
// TestNilWallclockScenario specifically tests for the panic seen in production:
92+
// when OnEpochChanged is called on a nil receiver.
93+
func TestNilWallclockScenario(t *testing.T) {
94+
// Create a metadata service with a valid wallclock
95+
metadata := &MetadataService{
96+
wallclock: NewEthereumBeaconChain(time.Now(), time.Second*1, 2),
97+
}
98+
99+
wc := metadata.Wallclock()
100+
if wc == nil {
101+
t.Fatal("Wallclock should not be nil")
102+
}
103+
104+
// Register a callback.
105+
wc.OnEpochChanged(func(epoch Epoch) {})
106+
107+
// If the beacon chain connection fails/is-lost, the wallclock becomes nil.
108+
// Subsequent callbacks then attempt to call the nil wallclock, which panics.
109+
metadata.wallclock = nil
110+
111+
shouldPanic := func() {
112+
defer func() {
113+
if r := recover(); r == nil {
114+
t.Error("Expected panic when using nil wallclock, but no panic occurred")
115+
} else {
116+
t.Logf("Got expected panic: %v", r)
117+
}
118+
}()
119+
120+
wc := metadata.Wallclock() // Get nil wallclock.
121+
wc.OnEpochChanged(func(epoch Epoch) {})
122+
}
123+
124+
shouldPanic()
125+
}

0 commit comments

Comments
 (0)