Skip to content

Commit 702434e

Browse files
committed
fix: blockchain, span_store, finality data races
1 parent 3d71e61 commit 702434e

File tree

6 files changed

+63
-9
lines changed

6 files changed

+63
-9
lines changed

consensus/bor/span_store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type SpanStore struct {
3535

3636
chainId string
3737
lastUsedSpan atomic.Pointer[borTypes.Span]
38-
latestKnownSpanId uint64
38+
latestKnownSpanId atomic.Uint64
3939
heimdallStatus atomic.Pointer[ctypes.SyncInfo]
4040

4141
// cancel function to stop the background routine
@@ -222,8 +222,8 @@ func (s *SpanStore) spanById(ctx context.Context, spanId uint64) (*borTypes.Span
222222
}
223223

224224
s.store.Add(spanId, currentSpan)
225-
if currentSpan.Id > s.latestKnownSpanId {
226-
s.latestKnownSpanId = currentSpan.Id
225+
if currentSpan.Id > s.latestKnownSpanId.Load() {
226+
s.latestKnownSpanId.Store(currentSpan.Id)
227227
}
228228

229229
return currentSpan, nil

consensus/bor/span_store_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,3 +1332,38 @@ func TestSpanStore_WaitForNewSpan(t *testing.T) {
13321332
require.True(t, found)
13331333
})
13341334
}
1335+
1336+
func TestSpanStore_ConcurrentAccess(t *testing.T) {
1337+
spanStore := NewSpanStore(&MockHeimdallClient{}, nil, "1337")
1338+
defer spanStore.Close()
1339+
ctx := context.Background()
1340+
1341+
const numGoroutines = 20
1342+
var wg sync.WaitGroup
1343+
1344+
// Test concurrent access to spanById which updates the latestKnownSpanId.
1345+
for i := range numGoroutines {
1346+
wg.Add(1)
1347+
go func(id int) {
1348+
defer wg.Done()
1349+
spanId := uint64(id + 1)
1350+
span, err := spanStore.spanById(ctx, spanId)
1351+
require.NoError(t, err)
1352+
require.Equal(t, spanId, span.Id)
1353+
}(i)
1354+
}
1355+
1356+
wg.Wait()
1357+
1358+
// Verify that the final state is consistent.
1359+
finalLatestSpanId := spanStore.latestKnownSpanId.Load()
1360+
require.Equal(t, uint64(numGoroutines), finalLatestSpanId, "Latest span ID should be the highest processed span")
1361+
1362+
// Verify that all spans are accessible.
1363+
for i := range numGoroutines {
1364+
spanId := uint64(i + 1)
1365+
span, err := spanStore.spanById(ctx, spanId)
1366+
require.NoError(t, err)
1367+
require.Equal(t, spanId, span.Id)
1368+
}
1369+
}

core/blockchain.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ type BlockChain struct {
312312

313313
// Bor related changes
314314
borReceiptsCache *lru.Cache[common.Hash, *types.Receipt] // Cache for the most recent bor receipt receipts per block
315+
stateSyncMu sync.RWMutex // Mutex to protect the stateSyncData access
315316
stateSyncData []*types.StateSyncData // State sync data
316317
stateSyncFeed event.Feed // State sync feed
317318
chain2HeadFeed event.Feed // Reorg/NewHead/Fork data feed
@@ -2107,9 +2108,11 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
21072108
if emitHeadEvent {
21082109
bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()})
21092110
// BOR state sync feed related changes
2111+
bc.stateSyncMu.RLock()
21102112
for _, data := range bc.stateSyncData {
21112113
bc.stateSyncFeed.Send(StateSyncEvent{Data: data})
21122114
}
2115+
bc.stateSyncMu.RUnlock()
21132116
// BOR
21142117
}
21152118
} else {
@@ -2707,9 +2710,11 @@ func (bc *BlockChain) insertChainWithWitnesses(chain types.Blocks, setHead bool,
27072710
}
27082711

27092712
// BOR state sync feed related changes
2713+
bc.stateSyncMu.RLock()
27102714
for _, data := range bc.stateSyncData {
27112715
bc.stateSyncFeed.Send(StateSyncEvent{Data: data})
27122716
}
2717+
bc.stateSyncMu.RUnlock()
27132718
// BOR
27142719
ptime := time.Since(pstart) - vtime - statedb.BorConsensusTime
27152720

core/blockchain_reader.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,10 +541,14 @@ type BorStateSyncer interface {
541541

542542
// SetStateSync set sync data in state_data
543543
func (bc *BlockChain) SetStateSync(stateData []*types.StateSyncData) {
544+
bc.stateSyncMu.Lock()
545+
defer bc.stateSyncMu.Unlock()
544546
bc.stateSyncData = stateData
545547
}
546548

547549
func (bc *BlockChain) GetStateSync() []*types.StateSyncData {
550+
bc.stateSyncMu.RLock()
551+
defer bc.stateSyncMu.RUnlock()
548552
return bc.stateSyncData
549553
}
550554

eth/downloader/whitelist/milestone.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,21 @@ func (m *milestone) IsValidPeer(fetchHeadersByNumber func(number uint64, amount
110110
}
111111

112112
// Fork detection: Check if we have a local fork - if so, allow sync to recover
113-
if m.blockchain != nil && m.doExist {
114-
localHead := m.blockchain.CurrentBlock()
115-
if localHead != nil && localHead.Number.Uint64() >= m.Number {
116-
localBlock := m.blockchain.GetBlockByNumber(m.Number)
117-
if localBlock != nil && localBlock.Hash() != m.Hash {
113+
// Read finality fields under lock protection to avoid race conditions
114+
m.finality.RLock()
115+
blockchain := m.blockchain
116+
doExist := m.doExist
117+
hash := m.Hash
118+
number := m.Number
119+
m.finality.RUnlock()
120+
121+
if blockchain != nil && doExist {
122+
localHead := blockchain.CurrentBlock()
123+
if localHead != nil && localHead.Number.Uint64() >= number {
124+
localBlock := blockchain.GetBlockByNumber(number)
125+
if localBlock != nil && localBlock.Hash() != hash {
118126
log.Info("Fork detected, allowing peer sync for recovery",
119-
"local", localBlock.Hash(), "milestone", m.Hash, "block", m.Number)
127+
"local", localBlock.Hash(), "milestone", hash, "block", number)
120128
MilestonePeerMeter.Mark(int64(1))
121129
return true, nil
122130
}

eth/downloader/whitelist/service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ func NewService(db ethdb.Database) *Service {
8989
// SetBlockchain sets the blockchain reference for the milestone service
9090
func (s *Service) SetBlockchain(blockchain ChainReader) {
9191
if milestone, ok := s.milestoneService.(*milestone); ok {
92+
milestone.finality.Lock()
9293
milestone.blockchain = blockchain
94+
milestone.finality.Unlock()
9395
}
9496
}
9597

0 commit comments

Comments
 (0)