Skip to content

Commit 60e9892

Browse files
authored
Beacon sync maint update (#3421)
* Code cosmetics, docu/comment and logging updates, etc. details + renamed `*_staged/*_fetch.nim`-> `*_staged/staged_fetch.nim` + extracted helper functions into `*_staged/staged_helpers.nim` + remove some debugging statements from `update.nim` * Rename `headers_staged.nim` -> `headers.nim` and clean up sub-modules why Simplifies source file layout details Rename `headers_staged/staged_*.nim` -> `headers/headers_*.nim` * Rename `blocks_staged.nim` -> `blocks.nim` and clean up sub-modules why Simplifies source file layout details Rename `blocks_staged/staged_*.nim` -> `blocks/blocks_*.nim`
1 parent 3413847 commit 60e9892

22 files changed

Lines changed: 328 additions & 268 deletions

execution_chain/sync/beacon/worker.nim

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import
1717
pkg/stew/[interval_set, sorted_set],
1818
../../common,
1919
./worker/update/[metrics, ticker],
20-
./worker/[blocks_staged, headers_staged, start_stop, update],
20+
./worker/[blocks, headers, start_stop, update],
2121
./worker_desc
2222

2323
# ------------------------------------------------------------------------------
@@ -29,8 +29,8 @@ proc napUnlessSomethingToCollect(
2929
): Future[bool] {.async: (raises: []).} =
3030
## When idle, save cpu cycles waiting for something to do.
3131
if buddy.ctx.hibernate or # not activated yet?
32-
not (buddy.headersStagedCollectOk() or # something on TODO list
33-
buddy.blocksStagedCollectOk()):
32+
not (buddy.headersCollectOk() or # something on TODO list
33+
buddy.blocksCollectOk()):
3434
try:
3535
await sleepAsync workerIdleWaitInterval
3636
except CancelledError:
@@ -75,8 +75,7 @@ proc start*(buddy: BeaconBuddyRef; info: static[string]): bool =
7575
proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
7676
## Clean up this peer
7777
if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer,
78-
nSyncPeers=(buddy.ctx.pool.nBuddies-1), syncState=($buddy.syncState),
79-
nLaps=buddy.only.nMultiLoop, lastIdleGap=buddy.only.multiRunIdle.toStr
78+
nSyncPeers=(buddy.ctx.pool.nBuddies-1), syncState=($buddy.syncState)
8079
buddy.stopBuddy()
8180

8281
# --------------------
@@ -107,6 +106,7 @@ proc runTicker*(ctx: BeaconCtxRef; info: static[string]) =
107106
ctx.updateMetrics()
108107
ctx.updateTicker()
109108

109+
110110
proc runDaemon*(
111111
ctx: BeaconCtxRef;
112112
info: static[string];
@@ -124,10 +124,10 @@ proc runDaemon*(
124124
return
125125

126126
# Execute staged block records.
127-
if ctx.blocksStagedProcessOk():
127+
if ctx.blocksUnstageOk():
128128

129129
# Import bodies from the `staged` queue.
130-
discard await ctx.blocksStagedProcess info
130+
discard await ctx.blocksUnstage info
131131

132132
if not ctx.daemon or # Implied by external sync shutdown?
133133
ctx.poolMode: # Oops, re-org needed?
@@ -170,48 +170,40 @@ proc runPeer*(
170170
## This peer worker method is repeatedly invoked (exactly one per peer) while
171171
## the `buddy.ctrl.poolMode` flag is set `false`.
172172
##
173-
if 0 < buddy.only.nMultiLoop: # statistics/debugging
174-
buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun
175-
buddy.only.nMultiLoop.inc # statistics/debugging
176-
177173
if not await buddy.napUnlessSomethingToCollect():
178174

179175
# Download and process headers and blocks
180-
while buddy.headersStagedCollectOk():
176+
while buddy.headersCollectOk():
181177

182178
# Collect headers and either stash them on the header chain cache
183179
# directly, or stage on the header queue to get them serialised and
184180
# stashed, later.
185-
await buddy.headersStagedCollect info
181+
await buddy.headersCollect info
186182

187183
# Store serialised headers from the `staged` queue onto the header
188184
# chain cache.
189-
if not buddy.headersStagedProcess info:
185+
if not buddy.headersUnstage info:
190186
# Need to proceed with another peer (e.g. gap between queue and
191187
# header chain cache.)
192188
break
193189

190+
# End `while()`
191+
194192
# Fetch bodies and combine them with headers to blocks to be staged. These
195193
# staged blocks are then excuted by the daemon process (no `peer` needed.)
196-
while buddy.blocksStagedCollectOk():
194+
while buddy.blocksCollectOk():
197195

198196
# Collect bodies and either import them via `FC` module, or stage on
199197
# the blocks queue to get them serialised and imported, later.
200-
await buddy.blocksStagedCollect info
198+
await buddy.blocksCollect info
201199

202200
# Import bodies from the `staged` queue.
203-
if not await buddy.blocksStagedProcess info:
201+
if not await buddy.blocksUnstage info:
204202
# Need to proceed with another peer (e.g. gap between top imported
205203
# block and blocks queue.)
206204
break
207205

208-
# Note that it is important **not** to leave this function to be
209-
# re-invoked by the scheduler unless necessary. While the time gap
210-
# until restarting is typically a few millisecs, there are always
211-
# outliers which well exceed several seconds. This seems to let
212-
# remote peers run into timeouts so they eventually get lost early.
213-
214-
buddy.only.stoppedMultiRun = Moment.now() # statistics/debugging
206+
# End `while()`
215207

216208
# ------------------------------------------------------------------------------
217209
# End

execution_chain/sync/beacon/worker/blocks_staged.nim renamed to execution_chain/sync/beacon/worker/blocks.nim

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ import
1616
pkg/stew/[interval_set, sorted_set],
1717
../../../networking/p2p,
1818
../worker_desc,
19-
./blocks_staged/[bodies_fetch, staged_blocks],
20-
./blocks_unproc
19+
./blocks/[blocks_blocks, blocks_helpers, blocks_queue, blocks_unproc]
20+
21+
export
22+
blocks_queue, blocks_unproc
2123

2224
# ------------------------------------------------------------------------------
2325
# Private function(s)
2426
# ------------------------------------------------------------------------------
2527

2628
proc blocksStagedProcessImpl(
2729
ctx: BeaconCtxRef;
28-
maybePeer: Opt[Peer];
30+
maybePeer: Opt[BeaconBuddyRef];
2931
info: static[string];
3032
): Future[bool]
3133
{.async: (raises: []).} =
@@ -52,7 +54,7 @@ proc blocksStagedProcessImpl(
5254
# round: no unprocessed block number range precedes the least staged block.
5355
let minNum = qItem.data.blocks[0].header.number
5456
if ctx.subState.top + 1 < minNum:
55-
trace info & ": block queue not ready yet", peer=($maybePeer),
57+
trace info & ": block queue not ready yet", peer=maybePeer.toStr,
5658
topImported=ctx.subState.top.bnStr, qItem=qItem.data.blocks.bnStr,
5759
nStagedQ=ctx.blk.staged.len, nSyncPeers=ctx.pool.nBuddies
5860
switchPeer = true # there is a gap -- come back later
@@ -79,7 +81,7 @@ proc blocksStagedProcessImpl(
7981
nStagedQ=ctx.blk.staged.len, nSyncPeers=ctx.pool.nBuddies, switchPeer
8082

8183
elif 0 < ctx.blk.staged.len and not switchPeer:
82-
trace info & ": no blocks unqueued", peer=($maybePeer),
84+
trace info & ": no blocks unqueued", peer=maybePeer.toStr,
8385
topImported=ctx.subState.top.bnStr, nStagedQ=ctx.blk.staged.len,
8486
nSyncPeers=ctx.pool.nBuddies
8587

@@ -89,26 +91,18 @@ proc blocksStagedProcessImpl(
8991
# Public functions
9092
# ------------------------------------------------------------------------------
9193

92-
func blocksStagedCollectOk*(buddy: BeaconBuddyRef): bool =
94+
func blocksCollectOk*(buddy: BeaconBuddyRef): bool =
9395
## Check whether body records can be fetched and imported or stored
9496
## on the `staged` queue.
9597
##
9698
if buddy.ctrl.running:
9799
let ctx = buddy.ctx
98100
if 0 < ctx.blocksUnprocAvail() and
99-
not ctx.blocksModeStopped():
101+
not ctx.blkSessionStopped():
100102
return true
101103
false
102104

103-
proc blocksStagedProcessOk*(ctx: BeaconCtxRef): bool =
104-
## Check whether import processing is possible
105-
##
106-
not ctx.poolMode and
107-
0 < ctx.blk.staged.len
108-
109-
# --------------
110-
111-
proc blocksStagedCollect*(
105+
proc blocksCollect*(
112106
buddy: BeaconBuddyRef;
113107
info: static[string];
114108
) {.async: (raises: []).} =
@@ -168,7 +162,7 @@ proc blocksStagedCollect*(
168162
ctx.pool.seenData = true # blocks data exist
169163

170164
# Import blocks (no staging)
171-
await ctx.blocksImport(Opt.some(peer), blocks, buddy.peerID, info)
165+
await ctx.blocksImport(Opt.some(buddy), blocks, buddy.peerID, info)
172166

173167
# Import may be incomplete, so a partial roll back may be needed
174168
let lastBn = blocks[^1].header.number
@@ -225,24 +219,32 @@ proc blocksStagedCollect*(
225219

226220
info "Queued/staged or imported blocks",
227221
topImported=ctx.subState.top.bnStr,
228-
unprocBottom=(if ctx.blocksModeStopped(): "n/a"
222+
unprocBottom=(if ctx.blkSessionStopped(): "n/a"
229223
else: ctx.blocksUnprocAvailBottom.bnStr),
230224
nQueued, nImported, nStagedQ=ctx.blk.staged.len,
231225
nSyncPeers=ctx.pool.nBuddies
232226

227+
# --------------
233228

234-
template blocksStagedProcess*(
229+
proc blocksUnstageOk*(ctx: BeaconCtxRef): bool =
230+
## Check whether import processing is possible
231+
##
232+
not ctx.poolMode and
233+
0 < ctx.blk.staged.len
234+
235+
template blocksUnstage*(
235236
ctx: BeaconCtxRef;
236237
info: static[string];
237238
): auto =
238-
ctx.blocksStagedProcessImpl(Opt.none(Peer), info)
239+
ctx.blocksStagedProcessImpl(Opt.none(BeaconBuddyRef), info)
239240

240-
template blocksStagedProcess*(
241+
template blocksUnstage*(
241242
buddy: BeaconBuddyRef;
242243
info: static[string];
243244
): auto =
244-
buddy.ctx.blocksStagedProcessImpl(Opt.some(buddy.peer), info)
245+
buddy.ctx.blocksStagedProcessImpl(Opt.some(buddy), info)
245246

247+
# --------------
246248

247249
proc blocksStagedReorg*(ctx: BeaconCtxRef; info: static[string]) =
248250
## Some pool mode intervention.

execution_chain/sync/beacon/worker/blocks_staged/staged_blocks.nim renamed to execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import
1717
../../../../networking/p2p,
1818
../../../wire_protocol/types,
1919
../../worker_desc,
20-
../[blocks_unproc, update],
21-
./bodies_fetch
20+
../update,
21+
./[blocks_fetch, blocks_helpers, blocks_import, blocks_unproc]
2222

2323
# ------------------------------------------------------------------------------
2424
# Private helpers
@@ -72,7 +72,7 @@ proc blocksFetchCheckImpl(
7272
request.blockHashes[^1] = blocks[^1].header.computeBlockHash
7373

7474
# Fetch bodies
75-
let bodies = (await buddy.bodiesFetch(request, info)).valueOr:
75+
let bodies = (await buddy.fetchBodies request).valueOr:
7676
return Opt.none(seq[EthBlock])
7777
if buddy.ctrl.stopped:
7878
return Opt.none(seq[EthBlock])
@@ -93,8 +93,8 @@ proc blocksFetchCheckImpl(
9393
break checkTxLenOk
9494
# Oops, cut off the rest
9595
blocks.setLen(n) # curb off junk
96-
buddy.fetchRegisterError()
97-
trace info & ": cut off junk blocks", peer, iv, n,
96+
buddy.bdyFetchRegisterError()
97+
trace info & ": Cut off junk blocks", peer, iv, n,
9898
nTxs=bodies[n].transactions.len, nBodies, bdyErrors=buddy.bdyErrors
9999
break loop
100100

@@ -121,13 +121,6 @@ proc blocksFetchCheckImpl(
121121
# Public functions
122122
# ------------------------------------------------------------------------------
123123

124-
func blocksModeStopped*(ctx: BeaconCtxRef): bool =
125-
## Helper, checks whether there is a general stop conditions based on
126-
## state settings (not on sync peer ctrl as `buddy.ctrl.running`.)
127-
ctx.poolMode or
128-
ctx.pool.lastState != blocks
129-
130-
131124
proc blocksFetch*(
132125
buddy: BeaconBuddyRef;
133126
num: uint;
@@ -153,7 +146,7 @@ proc blocksFetch*(
153146
# Job might have been cancelled or completed while downloading blocks.
154147
# If so, no more bookkeeping of blocks must take place. The *books*
155148
# might have been reset and prepared for the next stage.
156-
if ctx.blocksModeStopped():
149+
if ctx.blkSessionStopped():
157150
return Opt.none(seq[EthBlock]) # stop, exit this function
158151

159152
# Commit blocks received
@@ -167,7 +160,7 @@ proc blocksFetch*(
167160

168161
proc blocksImport*(
169162
ctx: BeaconCtxRef;
170-
maybePeer: Opt[Peer];
163+
maybePeer: Opt[BeaconBuddyRef];
171164
blocks: seq[EthBlock];
172165
peerID: Hash;
173166
info: static[string];
@@ -179,7 +172,7 @@ proc blocksImport*(
179172
let iv = BnRange.new(blocks[0].header.number, blocks[^1].header.number)
180173
doAssert iv.len == blocks.len.uint64
181174

182-
trace info & ": Start importing blocks", peer=($maybePeer), iv,
175+
trace info & ": Start importing blocks", peer=maybePeer.toStr, iv,
183176
nBlocks=iv.len, base=ctx.chain.baseNumber.bnStr,
184177
head=ctx.chain.latestNumber.bnStr
185178

@@ -188,16 +181,8 @@ proc blocksImport*(
188181
for n in 0 ..< blocks.len:
189182
let nBn = blocks[n].header.number
190183

191-
if nBn <= ctx.chain.baseNumber:
192-
trace info & ": ignoring block less eq. base", n, iv, nBlocks=iv.len,
193-
nthBn=nBn.bnStr, nthHash=ctx.getNthHash(blocks, n).short,
194-
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr
195-
196-
ctx.subState.top = nBn # well, not really imported
197-
continue
198-
199-
try:
200-
(await ctx.chain.queueImportBlock blocks[n]).isOkOr:
184+
(await ctx.importBlock(maybePeer, blocks[n], peerID)).isOkOr:
185+
if not error.cancelled:
201186
isError = true
202187

203188
# Mark peer that produced that unusable headers list as a zombie
@@ -231,11 +216,9 @@ proc blocksImport*(
231216
head=ctx.chain.latestNumber.bnStr,
232217
blkFailCount=ctx.subState.procFailCount, `error`=error
233218

234-
break loop # stop
235-
# isOk => next instruction
236-
except CancelledError:
237-
break loop # shutdown?
219+
break loop
238220

221+
# isOk => next instruction
239222
ctx.subState.top = nBn # Block imported OK
240223

241224
# Allow pseudo/async thread switch.

0 commit comments

Comments
 (0)