Skip to content

Commit a7ac803

Browse files
authored
Beacon sync maint update (#3636)
* Re-implement the config hook inside the `BeaconSyncRef` descriptor why The implementation on PR #3619 relied on a global variable to be set before the constructor was called. This not gc-safe in general (see remarks on PR #3619.) * Remove `shouldRun()` from syncer why The decision whether running the syncer will have any effect at all should be made only within the function that sets up the syncer. There is no need for an external register (i.e. `ctx.shouldRun`.) * Fix header records list `metrics` race condition why By using list manipulation functions directly (e.g. ctx.hdr.staged.clear() without updating `metrics`) there were odd metrics states displayed. The `header` module now always uses wrappers for changing the header records list. These wrappers update the `metrics` variable accordingly. * Fix potential block records list `metrics` race condition why The `block` module now always uses wrappers for changing the block records list. These wrappers update the `metrics` variable accordingly.
1 parent 4597b4e commit a7ac803

File tree

9 files changed

+96
-68
lines changed

9 files changed

+96
-68
lines changed

execution_chain/nimbus_execution_client.nim

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,20 +114,6 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
114114
# Add protocol capabilities
115115
nimbus.wire = nimbus.ethNode.addEthHandlerCapability(nimbus.txPool)
116116

117-
# Always initialise beacon syncer. It might turn out that it will not
118-
# be started if there will be no point in doing so.
119-
nimbus.beaconSyncRef = BeaconSyncRef.init(
120-
nimbus.ethNode, nimbus.fc, conf.maxPeers,
121-
conf.engineApiServerEnabled())
122-
123-
# Optional for pre-setting the sync target (e.g. for debugging)
124-
if conf.beaconSyncTarget.isSome():
125-
let hex = conf.beaconSyncTarget.unsafeGet
126-
if not nimbus.beaconSyncRef.targetInit(hex, conf.beaconSyncTargetIsFinal):
127-
fatal "Error parsing --debug-beacon-sync-target hash32 argument",
128-
hash32=hex
129-
quit QuitFailure
130-
131117
# Connect directly to the static nodes
132118
let staticPeers = conf.getStaticPeers()
133119
if staticPeers.len > 0:
@@ -147,6 +133,34 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
147133
enableDiscV5 = DiscoveryType.V5 in discovery,
148134
)
149135

136+
# Initalise beacon sync descriptor.
137+
var syncerShouldRun = (conf.maxPeers > 0 or staticPeers.len > 0) and
138+
conf.engineApiServerEnabled()
139+
140+
# The beacon sync descriptor might have been pre-allocated with additional
141+
# features. So do not override.
142+
if nimbus.beaconSyncRef.isNil:
143+
nimbus.beaconSyncRef = BeaconSyncRef.init()
144+
else:
145+
syncerShouldRun = true
146+
147+
# Configure beacon syncer.
148+
nimbus.beaconSyncRef.config(nimbus.ethNode, nimbus.fc, conf.maxPeers)
149+
150+
# Optional for pre-setting the sync target (e.g. for debugging)
151+
if conf.beaconSyncTarget.isSome():
152+
syncerShouldRun = true
153+
let hex = conf.beaconSyncTarget.unsafeGet
154+
if not nimbus.beaconSyncRef.configTarget(hex, conf.beaconSyncTargetIsFinal):
155+
fatal "Error parsing hash32 argument for --debug-beacon-sync-target",
156+
hash32=hex
157+
quit QuitFailure
158+
159+
# Deactivating syncer if there is definitely no need to run it. This
160+
# avoids polling (i.e. waiting for instructions) and some logging.
161+
if not syncerShouldRun:
162+
nimbus.beaconSyncRef = BeaconSyncRef(nil)
163+
150164
proc setupMetrics(nimbus: NimbusNode, conf: NimbusConf)
151165
{.raises: [CancelledError, MetricsError].} =
152166
# metrics logging
@@ -282,7 +296,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
282296

283297
# Not starting syncer if there is definitely no way to run it. This
284298
# avoids polling (i.e. waiting for instructions) and some logging.
285-
if not nimbus.beaconSyncRef.shouldRun() or
299+
if not nimbus.beaconSyncRef.isNil and
286300
not nimbus.beaconSyncRef.start():
287301
nimbus.beaconSyncRef = BeaconSyncRef(nil)
288302

execution_chain/sync/beacon.nim

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ export
2525
logScope:
2626
topics = "beacon sync"
2727

28-
var beaconSyncConfigHook = BeaconSyncConfigHook(nil)
29-
## Optional configuration request hook. This must be initialised before
30-
## the `BeaconSyncRef.init()` constructor is called to be effective.
31-
3228
# ------------------------------------------------------------------------------
3329
# Virtual methods/interface, `mixin` functions
3430
# ------------------------------------------------------------------------------
@@ -61,55 +57,52 @@ proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
6157
# Public functions
6258
# ------------------------------------------------------------------------------
6359

64-
proc config*(
65-
_: type BeaconSyncRef;
66-
configCB: BeaconSyncConfigHook;
67-
): BeaconSyncConfigHook =
68-
## Set config hook to be run by the `BeaconSyncRef.init()` constructor
69-
## request. If activated, it will alse make `shouldRun()` return `true`.
70-
## The function returns the previous hook if there was any, or `nil`.
71-
##
72-
var oldHook = beaconSyncConfigHook
73-
beaconSyncConfigHook = configCB
74-
move oldHook
75-
7660
proc init*(
7761
T: type BeaconSyncRef;
62+
configCB = BeaconSyncConfigHook(nil);
63+
): T =
64+
## Constructor
65+
##
66+
## The `configCB` allows to specify a final configuration task to be run at
67+
## the end of the `config()` function.
68+
##
69+
T(lazyConfigHook: configCB)
70+
71+
proc config*(
72+
desc: BeaconSyncRef;
7873
ethNode: EthereumNode;
7974
chain: ForkedChainRef;
8075
maxPeers: int;
81-
haveEngine: bool;
82-
): T =
83-
var desc = T()
76+
) =
77+
## Complete `BeaconSyncRef` descriptor initialisation.
78+
##
79+
## Note that the `init()` constructor might have specified a configuration
80+
## task to be run at the end of the `config()` function.
81+
##
82+
doAssert desc.ctx.isNil # This can only run once
8483
desc.initSync(ethNode, maxPeers)
8584
desc.ctx.pool.chain = chain
86-
desc.ctx.shouldRun = (0 < maxPeers and haveEngine)
87-
88-
if not beaconSyncConfigHook.isNil:
89-
beaconSyncConfigHook(desc)
90-
beaconSyncConfigHook = nil
91-
desc.ctx.shouldRun = true
92-
93-
desc
9485

95-
proc shouldRun*(desc: BeaconSyncRef): bool =
96-
## Getter
97-
desc.ctx.shouldRun
86+
if not desc.lazyConfigHook.isNil:
87+
desc.lazyConfigHook(desc)
88+
desc.lazyConfigHook = nil
9889

99-
proc targetInit*(desc: BeaconSyncRef; hex: string; isFinal: bool): bool =
90+
proc configTarget*(desc: BeaconSyncRef; hex: string; isFinal: bool): bool =
10091
## Set up inital target sprint (if any, mainly for debugging)
92+
doAssert not desc.ctx.isNil
10193
try:
10294
desc.ctx.headersTargetRequest(Hash32.fromHex(hex), isFinal, "init")
103-
desc.ctx.shouldRun = true
10495
return true
10596
except ValueError:
10697
discard
10798
# false
10899

109100
proc start*(desc: BeaconSyncRef): bool =
101+
doAssert not desc.ctx.isNil
110102
desc.startSync()
111103

112104
proc stop*(desc: BeaconSyncRef) {.async.} =
105+
doAssert not desc.ctx.isNil
113106
await desc.stopSync()
114107

115108
# ------------------------------------------------------------------------------

execution_chain/sync/beacon/beacon_desc.nim

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ export
1818
sync_desc, worker_desc
1919

2020
type
21-
BeaconSyncRef* = RunnerSyncRef[BeaconCtxData,BeaconBuddyData]
22-
2321
BeaconSyncConfigHook* = proc(desc: BeaconSyncRef) {.gcsafe, raises: [].}
2422
## Conditional configuration request hook
2523

24+
BeaconSyncRef* = ref object of RunnerSyncRef[BeaconCtxData,BeaconBuddyData]
25+
## Instance descriptor, extends scheduler object
26+
lazyConfigHook*: BeaconSyncConfigHook
27+
2628
# End

execution_chain/sync/beacon/worker/blocks.nim

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,12 @@ template blocksCollect*(
154154
let
155155
# Insert blocks list on the `staged` queue
156156
key = rc.value[0].header.number
157-
qItem = ctx.blk.staged.insert(key).valueOr:
157+
qItem = ctx.blocksStagedQueueInsert(key).valueOr:
158158
raiseAssert info & ": duplicate key on staged queue iv=" &
159159
(key, rc.value[^1].header.number).bnStr
160160

161161
qItem.data.blocks = rc.value # store `blocks[]` list
162162
qItem.data.peerID = buddy.peerID
163-
164-
ctx.blocksStagedQueueMetricsUpdate() # metrics
165163
nQueued += rc.value.len # statistics
166164
# End if
167165

@@ -254,8 +252,7 @@ template blocksUnstage*(
254252
break
255253

256254
# Remove from queue
257-
discard ctx.blk.staged.delete qItem.key
258-
ctx.blocksStagedQueueMetricsUpdate() # metrics
255+
ctx.blocksStagedQueueDelete qItem.key
259256

260257
# Import blocks list, async/template
261258
nImported += buddy.blocksImport(qItem.data.blocks,qItem.data.peerID, info)

execution_chain/sync/beacon/worker/blocks/blocks_queue.nim

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
{.push raises:[].}
1212

1313
import
14-
pkg/[eth/common, metrics],
14+
pkg/[eth/common, metrics, results],
1515
pkg/stew/[interval_set, sorted_set],
1616
../worker_desc
1717

@@ -34,7 +34,20 @@ func blocksStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
3434
## `true` iff no data are on the queue.
3535
ctx.blk.staged.len == 0
3636

37-
proc blocksStagedQueueMetricsUpdate*(ctx: BeaconCtxRef) =
37+
proc blocksStagedQueueInsert*(
38+
ctx: BeaconCtxRef;
39+
key: BlockNumber;
40+
): Opt[SortedSetItemRef[BlockNumber,BlocksForImport]] =
41+
let qItem = ctx.blk.staged.insert(key).valueOr:
42+
return err()
43+
metrics.set(nec_sync_block_lists_staged, ctx.blk.staged.len)
44+
ok(qItem)
45+
46+
proc blocksStagedQueueDelete*(
47+
ctx: BeaconCtxRef;
48+
key: BlockNumber;
49+
) =
50+
discard ctx.blk.staged.delete(key)
3851
metrics.set(nec_sync_block_lists_staged, ctx.blk.staged.len)
3952

4053
# ----------------
@@ -43,6 +56,7 @@ proc blocksStagedQueueClear*(ctx: BeaconCtxRef) =
4356
## Clear queue
4457
ctx.blk.staged.clear()
4558
ctx.blk.reserveStaged = 0
59+
metrics.set(nec_sync_block_lists_staged, 0)
4660

4761
proc blocksStagedQueueInit*(ctx: BeaconCtxRef) =
4862
## Constructor

execution_chain/sync/beacon/worker/headers.nim

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,11 @@ template headersCollect*(buddy: BeaconBuddyRef; info: static[string]) =
162162
let
163163
# Insert headers list on the `staged` queue
164164
key = rc.value[0].number
165-
qItem = ctx.hdr.staged.insert(key).valueOr:
165+
qItem = ctx.headersStagedQueueInsert(key).valueOr:
166166
raiseAssert info & ": duplicate key on staged queue" &
167167
" iv=" & (rc.value[^1].number,key).bnStr
168168
qItem.data.revHdrs = rc.value
169169
qItem.data.peerID = buddy.peerID
170-
171-
ctx.headersStagedQueueMetricsUpdate() # metrics
172170
nQueued = rc.value.len # statistics
173171
# End if
174172

@@ -247,8 +245,7 @@ proc headersUnstage*(buddy: BeaconBuddyRef; info: static[string]): bool =
247245
break
248246

249247
# Remove from queue
250-
discard ctx.hdr.staged.delete(qItem.key)
251-
ctx.headersStagedQueueMetricsUpdate() # metrics
248+
ctx.headersStagedQueueDelete(qItem.key)
252249

253250
# Store headers on database
254251
let nHdrs = buddy.headersStashOnDisk(
@@ -285,7 +282,7 @@ proc headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) =
285282
nUnproc=ctx.headersUnprocTotal(), nStagedQ=ctx.hdr.staged.len
286283

287284
ctx.headersUnprocClear() # clears `unprocessed` and `borrowed` list
288-
ctx.hdr.staged.clear()
285+
ctx.headersStagedQueueClear()
289286
ctx.subState.reset
290287

291288
# ------------------------------------------------------------------------------

execution_chain/sync/beacon/worker/headers/headers_queue.nim

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
{.push raises:[].}
1212

1313
import
14-
pkg/[eth/common, metrics],
14+
pkg/[eth/common, metrics, results],
1515
pkg/stew/[interval_set, sorted_set],
1616
../worker_desc
1717

@@ -34,7 +34,20 @@ func headersStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
3434
## `true` iff no data are on the queue.
3535
ctx.hdr.staged.len == 0
3636

37-
proc headersStagedQueueMetricsUpdate*(ctx: BeaconCtxRef) =
37+
proc headersStagedQueueInsert*(
38+
ctx: BeaconCtxRef;
39+
key: BlockNumber;
40+
): Opt[SortedSetItemRef[BlockNumber,LinkedHChain]] =
41+
let qItem = ctx.hdr.staged.insert(key).valueOr:
42+
return err()
43+
metrics.set(nec_sync_header_lists_staged, ctx.hdr.staged.len)
44+
ok(qItem)
45+
46+
proc headersStagedQueueDelete*(
47+
ctx: BeaconCtxRef;
48+
key: BlockNumber;
49+
) =
50+
discard ctx.hdr.staged.delete(key)
3851
metrics.set(nec_sync_header_lists_staged, ctx.hdr.staged.len)
3952

4053
# ----------------

execution_chain/sync/sync_desc.nim

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ type
4545
poolMode*: bool ## Activate `runPool()` workers if set `true`
4646
daemon*: bool ## Enable global background job
4747
pool*: S ## Shared context for all worker peers
48-
shouldRun*: bool ## Config suggests starting the scheduler
4948

5049
# ------------------------------------------------------------------------------
5150
# Public functions

execution_chain/sync/sync_sched.nim

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,15 @@ import
9393
./[sync_desc, wire_protocol]
9494

9595
type
96-
ActiveBuddies[S,W] = ##\
96+
ActiveBuddies[S,W] = KeyedQueue[ENode,RunnerBuddyRef[S,W]]
9797
## List of active workers, using `Hash(Peer)` rather than `Peer`
98-
KeyedQueue[ENode,RunnerBuddyRef[S,W]]
9998

10099
RunCtrl = enum
101100
terminated = 0
102101
shutdown
103102
running
104103

105-
RunnerSyncRef*[S,W] = ref object
104+
RunnerSyncRef*[S,W] = ref object of RootRef
106105
## Module descriptor
107106
ctx*: CtxRef[S] ## Shared data
108107
pool: PeerPool ## For starting the system

0 commit comments

Comments
 (0)