Skip to content

Commit 4c3cfc9

Browse files
authored
Beacon sync provide peer status msg info (#3879)
* Scheduler supports an init function independent of the worker `start()` why The init function can be made dependent on the accepted sub-protocol thus avoiding run time queries of the protocol type when loading parts of the status message record for the ethXX protocol. Thus approach allows for varying compile time sub-protocol code. * Provide bestBlockHash/latestHash for the `beacon` sync worker descriptor why This is not needed by the `beacon` syncer but is useful for the `snap` syncer which use this block hash as an internal sync target. * Code cosmetics, logging update
1 parent 17182b4 commit 4c3cfc9

File tree

10 files changed

+91
-53
lines changed

10 files changed

+91
-53
lines changed

execution_chain/sync/beacon.nim

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,19 @@ logScope:
3232

3333
proc addBeaconSyncProtocol(desc: BeaconSyncRef; PROTO: type) =
3434
## Add protocol and call back filter function for ethXX
35-
desc.addSyncProtocol(PROTO):
36-
proc(peer: Peer): bool =
37-
let state = peer.state(PROTO)
38-
not state.isNil and state.initialized
35+
proc acceptPeer(peer: Peer): bool =
36+
let state = peer.state(PROTO)
37+
not state.isNil and state.initialized
38+
39+
proc initWorker(worker: SyncPeerRef[BeaconCtxData,BeaconPeerData]) =
40+
when PROTO is eth68:
41+
worker.only.pivotHash = worker.peer.state(PROTO).bestHash
42+
elif PROTO is eth69:
43+
worker.only.pivotHash = worker.peer.state(PROTO).latestHash
44+
else:
45+
{.error: "Unsupported eth/?? version".}
46+
47+
desc.addSyncProtocol(PROTO, acceptPeer, initWorker)
3948

4049
# ------------------------------------------------------------------------------
4150
# Virtual methods/interface, `mixin` functions

execution_chain/sync/beacon/worker.nim

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ import
1919
./worker/headers/headers_target,
2020
./worker/[blocks, classify, headers, helpers, start_stop, update, worker_desc]
2121

22+
logScope:
23+
topics = "beacon sync"
24+
2225
# ------------------------------------------------------------------------------
2326
# Public start/stop and admin functions
2427
# ------------------------------------------------------------------------------

execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ template blocksFetchCheckImpl(
5656
block body:
5757
let
5858
ctx = buddy.ctx
59-
iv {.inject,used.} = iv
6059
peer {.inject,used.} = $buddy.peer # logging only
6160

6261
# Preset headers to be completed with bodies. Also collect block
@@ -68,16 +67,16 @@ template blocksFetchCheckImpl(
6867
for n in 1u ..< iv.len:
6968
let header = ctx.hdrCache.get(iv.minPt + n).valueOr:
7069
# There is nothing one can do here
71-
chronicles.info "Block header missing (reorg triggered)", peer, iv, n,
72-
nth=(iv.minPt + n)
70+
chronicles.info "Block header missing (reorg triggered)", peer,
71+
iv=($iv), n, nth=(iv.minPt + n)
7372
ctx.subState.cancelRequest = true # So require reorg
7473
break body # return err()
7574
request.blockHashes[n - 1] = header.parentHash
7675
blocks[n].header = header
7776
blocks[0].header = ctx.hdrCache.get(iv.minPt).valueOr:
7877
# There is nothing one can do here
79-
chronicles.info "Block header missing (reorg triggered)", peer, iv, n=0,
80-
nth=iv.minPt
78+
chronicles.info "Block header missing (reorg triggered)", peer,
79+
iv=($iv), n=0, nth=iv.minPt
8180
ctx.subState.cancelRequest = true # So require reorg
8281
break body # return err()
8382
request.blockHashes[^1] = blocks[^1].header.computeBlockHash
@@ -105,7 +104,7 @@ template blocksFetchCheckImpl(
105104
# Oops, cut off the rest
106105
blocks.setLen(n) # curb off junk
107106
buddy.bdyFetchRegisterError()
108-
trace info & ": Cut off junk blocks", peer, iv, n=n,
107+
trace info & ": Cut off junk blocks", peer, iv=($iv), n=n,
109108
nTxs=bodies[n].transactions.len, nBodies,
110109
nErrors=buddy.nErrors.fetch.bdy
111110
break loop
@@ -204,7 +203,7 @@ template blocksImport*(
204203

205204
var isError = false
206205
block loop:
207-
trace info & ": start importing blocks", peer, iv, nBlocks=iv.len,
206+
trace info & ": start importing blocks", peer, iv=($iv), nBlocks=iv.len,
208207
base=ctx.chain.baseNumber, head=ctx.chain.latestNumber
209208

210209
for n in 0 ..< blocks.len:
@@ -232,21 +231,24 @@ template blocksImport*(
232231

233232
# Proper logging ..
234233
if ctx.subState.cancelRequest:
235-
warn "Blocks import error (cancel this session)", n=n, iv,
234+
warn "Blocks import error (cancel this session)", n=n,
235+
iv=($iv),
236236
nBlocks=iv.len, nthBn,
237237
nthHash=ctx.getNthHash(blocks, n).short,
238238
base=ctx.chain.baseNumber,
239239
head=ctx.chain.latestNumber,
240240
blkFailCount=ctx.subState.procFailCount, error=error.toStr
241241
elif error.excp == ESyncerTermination:
242-
chronicles.debug "Blocks import error (skip remaining)", n=n, iv,
242+
chronicles.debug "Blocks import error (skip remaining)", n=n,
243+
iv=($iv),
243244
nBlocks=iv.len, nthBn,
244245
nthHash=ctx.getNthHash(blocks, n).short,
245246
base=ctx.chain.baseNumber,
246247
head=ctx.chain.latestNumber,
247248
blkFailCount=ctx.subState.procFailCount, error=error.toStr
248249
else:
249-
chronicles.info "Blocks import error (skip remaining)", n=n, iv,
250+
chronicles.info "Blocks import error (skip remaining)", n=n,
251+
iv=($iv),
250252
nBlocks=iv.len, nthBn,
251253
nthHash=ctx.getNthHash(blocks, n).short,
252254
base=ctx.chain.baseNumber,

execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func errStr(rc: Result[FetchBodiesData,BeaconError]): string =
4545
result = $rc.error.excp
4646
if 0 < rc.error.name.len:
4747
result &= "(" & rc.error.name & ")"
48+
if 0 < rc.error.msg.len:
49+
result &= "[" & rc.error.msg & "]"
4850
else:
4951
result = "n/a"
5052

@@ -105,7 +107,7 @@ template fetchBodies*(
105107
nReq {.inject,used.} = request.blockHashes.len
106108

107109
if request.blockHashes.len == 0:
108-
trace sendInfo & " empty request", peer, nReq,
110+
trace sendInfo & " empty request", peer, nReq, state=($buddy.syncState),
109111
nErrors=buddy.nErrors.fetch.bdy
110112
break body
111113

@@ -140,16 +142,19 @@ template fetchBodies*(
140142
# Debug message for other errors
141143
debug recvInfo & " error", peer, startHash=startHash.short, nReq,
142144
ela=elapsed.toStr, state=($buddy.syncState), error=rc.errStr,
143-
msg=rc.error.msg, nErrors=buddy.nErrors.fetch.bdy
145+
nErrors=buddy.nErrors.fetch.bdy
144146
break body # return err()
145147

148+
let
149+
ela {.inject,used.} = elapsed.toStr # logging only
150+
state {.inject,used.} = $buddy.syncState # logging only
151+
146152
# Evaluate result
147153
if rc.isErr or buddy.ctrl.stopped:
148154
if not buddy.maybeSlowPeerError(elapsed, request.blockHashes[0]):
149155
buddy.bdyFetchRegisterError()
150156
trace recvInfo & " error", peer, startHash=startHash.short, nReq,
151-
ela=elapsed.toStr, state=($buddy.syncState),
152-
error=rc.errStr, nErrors=buddy.nErrors.fetch.bdy
157+
ela, state, error=rc.errStr, nErrors=buddy.nErrors.fetch.bdy
153158
break body # return err()
154159

155160
# Verify the correct number of block bodies received
@@ -167,8 +172,7 @@ template fetchBodies*(
167172
discard buddy.maybeSlowPeerError(elapsed, request.blockHashes[0])
168173

169174
trace recvInfo & " error", peer, startHash=startHash.short, nReq,
170-
nResp=b.len, ela=elapsed.toStr, state=($buddy.syncState),
171-
nErrors=buddy.nErrors.fetch.bdy
175+
nResp=b.len, ela, state, nErrors=buddy.nErrors.fetch.bdy
172176
break body # return err()
173177

174178
# Update download statistics
@@ -184,9 +188,8 @@ template fetchBodies*(
184188
buddy.nErrors.fetch.bdy = 0 # reset error count
185189
buddy.ctx.pool.lastSlowPeer = Opt.none(Hash) # not last one or not error
186190

187-
trace recvInfo, peer, startHash=startHash.short, nReq,
188-
nResp=b.len, ela=elapsed.toStr, thPut=(bps.toIECb(1) & "ps"),
189-
state=($buddy.syncState), nErrors=buddy.nErrors.fetch.bdy
191+
trace recvInfo, peer, startHash=startHash.short, nReq, nResp=b.len, ela,
192+
thPut=(bps.toIECb(1) & "ps"), state, nErrors=buddy.nErrors.fetch.bdy
190193

191194
bodyRc = Opt[seq[BlockBody]].ok(b)
192195

execution_chain/sync/beacon/worker/headers/headers_fetch.nim

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func errStr(rc: Result[FetchHeadersData,BeaconError]): string =
4545
result = $rc.error.excp
4646
if 0 < rc.error.name.len:
4747
result &= "(" & rc.error.name & ")"
48+
if 0 < rc.error.msg.len:
49+
result &= "[" & rc.error.msg & "]"
4850
else:
4951
result = "n/a"
5052

@@ -105,6 +107,7 @@ template fetchHeadersReversed*(
105107
recvInfo = trEthRecvReceivedBlockHeaders
106108
let
107109
peer {.inject,used.} = $buddy.peer # logging only
110+
hash {.inject,used.} = topHash.toStr # logging only
108111
req = block:
109112
if topHash != emptyRoot:
110113
BlockHeadersRequest(
@@ -123,8 +126,8 @@ template fetchHeadersReversed*(
123126
isHash: false,
124127
number: ivReq.maxPt))
125128

126-
trace sendInfo & " reverse", peer, req=ivReq,
127-
nReq=req.maxResults, hash=topHash.toStr, nErrors=buddy.nErrors.fetch.hdr
129+
trace sendInfo & " reverse", peer, req=($ivReq), nReq=req.maxResults, hash,
130+
state=($buddy.syncState), nErrors=buddy.nErrors.fetch.hdr
128131

129132
let rc = await buddy.getBlockHeaders(req, BlockNumber ivReq.maxPt)
130133
var elapsed: Duration
@@ -143,24 +146,27 @@ template fetchHeadersReversed*(
143146
buddy.hdrFetchRegisterError()
144147
buddy.hdrNoSampleSize(elapsed)
145148
of EAlreadyTriedAndFailed:
146-
trace recvInfo & " error", peer, req=ivReq, nReq=req.maxResults,
147-
hash=topHash.toStr, ela=elapsed.toStr, state=($buddy.syncState),
148-
error=rc.errStr, nErrors=buddy.nErrors.fetch.hdr
149+
trace recvInfo & " error", peer, req=($ivReq), nReq=req.maxResults,
150+
hash, ela=elapsed.toStr, state=($buddy.syncState), error=rc.errStr,
151+
nErrors=buddy.nErrors.fetch.hdr
149152
break body # return err()
150153

151154
# Debug message for other errors
152-
debug recvInfo & " error", peer, req=ivReq, nReq=req.maxResults,
153-
hash=topHash.toStr, ela=elapsed.toStr, state=($buddy.syncState),
154-
error=rc.errStr, msg=rc.error.msg, nErrors=buddy.nErrors.fetch.hdr
155+
debug recvInfo & " error", peer, req=($ivReq), nReq=req.maxResults,
156+
hash, ela=elapsed.toStr, state=($buddy.syncState), error=rc.errStr,
157+
nErrors=buddy.nErrors.fetch.hdr
155158
break body # return err()
156159

160+
let
161+
ela {.inject,used.} = elapsed.toStr # logging only
162+
state {.inject,used.} = $buddy.syncState # logging only
163+
157164
# Evaluate result
158165
if rc.isErr or buddy.ctrl.stopped:
159166
if not buddy.maybeSlowPeerError(elapsed, BlockNumber ivReq.maxPt):
160167
buddy.hdrFetchRegisterError()
161-
trace recvInfo & " error", peer, nReq=req.maxResults, hash=topHash.toStr,
162-
nResp=0, ela=elapsed.toStr, state=($buddy.syncState),
163-
error=rc.errStr, nErrors=buddy.nErrors.fetch.hdr
168+
trace recvInfo & " error", peer, req=($ivReq), nReq=req.maxResults, hash,
169+
nResp=0, ela, state, error=rc.errStr, nErrors=buddy.nErrors.fetch.hdr
164170
break body # return err()
165171

166172
# Verify the correct number of block headers received
@@ -177,18 +183,16 @@ template fetchHeadersReversed*(
177183
# Slow response, definitely not fast enough
178184
discard buddy.maybeSlowPeerError(elapsed, BlockNumber ivReq.maxPt)
179185

180-
trace recvInfo & " error", peer, nReq=req.maxResults, hash=topHash.toStr,
181-
nResp=h.len, ela=elapsed.toStr, state=($buddy.syncState),
182-
nErrors=buddy.nErrors.fetch.hdr
186+
trace recvInfo & " error", peer, nReq=req.maxResults, hash, nResp=h.len,
187+
ela, state, nErrors=buddy.nErrors.fetch.hdr
183188
break body # return err()
184189

185190
# Verify that the first block number matches the request
186191
if h[^1].number != ivReq.minPt and ivReq.minPt != 0:
187192
buddy.hdrFetchRegisterError(forceZombie=true)
188-
trace recvInfo & " error", peer, nReq=req.maxResults, hash=topHash.toStr,
193+
trace recvInfo & " error", peer, nReq=req.maxResults, hash,
189194
reqMinPt=ivReq.minPt, respMinPt=h[^1].number, nResp=h.len,
190-
ela=elapsed.toStr, state=($buddy.syncState),
191-
nErrors=buddy.nErrors.fetch.hdr
195+
ela, state, nErrors=buddy.nErrors.fetch.hdr
192196
break body
193197

194198
# Update download statistics
@@ -204,10 +208,9 @@ template fetchHeadersReversed*(
204208
buddy.nErrors.fetch.hdr = 0 # reset error count
205209
buddy.ctx.pool.lastSlowPeer = Opt.none(Hash) # not last one or not error
206210

207-
trace recvInfo, peer, nReq=req.maxResults, hash=topHash.toStr,
208-
ivResp=(h[^1].number,h[0].number).toStr, nResp=h.len, ela=elapsed.toStr,
209-
thPut=(bps.toIECb(1) & "ps"), state=($buddy.syncState),
210-
nErrors=buddy.nErrors.fetch.hdr
211+
trace recvInfo, peer, nReq=req.maxResults, hash, ivResp=(h[^1].number,
212+
h[0].number).toStr, nResp=h.len, ela, thPut=(bps.toIECb(1) & "ps"),
213+
state, nErrors=buddy.nErrors.fetch.hdr
211214

212215
bodyRc = Opt[seq[Header]].ok(h)
213216

execution_chain/sync/beacon/worker/worker_desc.nim

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ type
149149

150150
BeaconPeerData* = object
151151
## Local descriptor data extension
152+
pivotHash*: Hash32 ## Peer best/latest hash (for `snap` sync)
152153
nErrors*: PeerErrors ## Error register
153154
thPutStats*: ThPutStats ## Throughput statistics
154155
failedReq*: PeerFirstFetchReq ## Avoid sending the same request twice

execution_chain/sync/sync_sched.nim

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ type
110110
allRunning ## Running, full support
111111
standByMode ## Suspending worker and deamon loop
112112

113-
PeerProtoCheck = ref object
113+
PeerProtoCheck[S,W] = ref object
114114
hasProto: AcceptPeerOk ## Sub protocol selector closure
115115
acceptPeer: AcceptPeerOk ## Can accept protocol enabled by `hasProto()`
116+
initWorker: InitWorker[S,W] ## Initialise if `acceptPeer()` succeeds
116117

117118
RunnerPeerRef[S,W] = ref object
118119
## Per worker peer descriptor
@@ -123,6 +124,7 @@ type
123124
# ---- public types ----
124125

125126
AcceptPeerOk* = proc(peer: Peer): bool {.gcsafe, raises: [].}
127+
InitWorker*[S,W] = proc(worker: SyncPeerRef[S,W]) {.gcsafe, raises: [].}
126128

127129
RunnerSyncRef*[S,W] = ref object of RootRef
128130
## Module descriptor
@@ -139,7 +141,7 @@ type
139141
activeMulti: int ## Number of async workers active/running
140142
runCtrl: RunCtrl ## Overall scheduler start/stop control
141143
po: PeerObserver ## P2p protocol handler environment
142-
filter: seq[PeerProtoCheck] ## List of p2p sub-protocol handler filters
144+
filter: seq[PeerProtoCheck[S,W]] ## List of p2p sub-protocol handler filters
143145

144146
const
145147
tickerExecLoopWaitInterval = 5.seconds
@@ -197,10 +199,13 @@ template lruReset(db: untyped): untyped =
197199
## Clear LRU list
198200
db = typeof(db).init db.capacity
199201

200-
proc alwaysAcceptPeerOk(peer: Peer): bool =
202+
func alwaysAcceptPeerOk(peer: Peer): bool =
201203
## Some default call back function
202204
true
203205

206+
func noopInitWorker[S,W](w: SyncPeerRef[S,W]) =
207+
discard
208+
204209
# ------------------------------------------------------------------------------
205210
# Private constructor helpers
206211
# ------------------------------------------------------------------------------
@@ -533,11 +538,14 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
533538
if dsc.runCtrl notin {allRunning,standByMode}:
534539
return
535540

541+
var initWorker: InitWorker[S,W]
536542
block protoFilter:
537543
# Accept if accepted by some vetting filter
538544
for filter in dsc.filter:
539545
if filter.hasProto(peer) and
540546
filter.acceptPeer(peer):
547+
# Load per-protocal initialisation
548+
initWorker = filter.initWorker
541549
break protoFilter
542550
# Otherwise ignore
543551
trace "No suitable protocol for peer", peer
@@ -574,6 +582,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
574582
ctx: dsc.ctx,
575583
peer: peer,
576584
peerID: peerID))
585+
buddy.worker.initWorker() # function was set above
577586
if not buddy.worker.runStart():
578587
trace "Ignoring useless peer", peer,
579588
nSyncPeers=dsc.nSyncPeers(), nSyncPeersMax=dsc.syncPeers.capacity,
@@ -662,18 +671,21 @@ proc initSync*[S,W](
662671
proc addSyncProtocol*[S,W](
663672
dsc: RunnerSyncRef[S,W];
664673
PROTO: type;
665-
acceptPeer = AcceptPeerOk(nil);
674+
acceptPeer: AcceptPeerOk = nil;
675+
initWorker: InitWorker[S,W] = nil;
666676
) =
667677
## Activate scheduler for a particular protocol. The filter argument
668678
## function `acceptPeer` is run before any other connection handler. If
669679
## the former returns `true`, processing goes ahead.
670680
##
671681
dsc.po.addProtocol PROTO
672-
dsc.filter.add PeerProtoCheck(
682+
dsc.filter.add PeerProtoCheck[S,W](
673683
hasProto: proc(p: Peer): bool {.gcsafe.} =
674684
p.supports(PROTO),
675685
acceptPeer:
676-
(if acceptPeer.isNil: alwaysAcceptPeerOk else: acceptPeer))
686+
(if acceptPeer.isNil: alwaysAcceptPeerOk else: acceptPeer),
687+
initWorker:
688+
(if initWorker.isNil: noopInitWorker[S,W] else: initWorker))
677689

678690
# ---------
679691

@@ -702,9 +714,10 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]; standBy = false): bool =
702714
# Activate protocol handlers
703715
dsc.peerPool.addObserver(dsc, dsc.po)
704716
if dsc.filter.len == 0:
705-
dsc.filter.add PeerProtoCheck(
717+
dsc.filter.add PeerProtoCheck[S,W](
706718
hasProto: alwaysAcceptPeerOk,
707-
acceptPeer: alwaysAcceptPeerOk)
719+
acceptPeer: alwaysAcceptPeerOk,
720+
initWorker: noopInitWorker[S,W])
708721

709722
asyncSpawn dsc.tickerLoop()
710723
return true

execution_chain/sync/wire_protocol/requester.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export
2323
defineProtocol(PROTO = eth68,
2424
version = 68,
2525
rlpxName = "eth",
26-
PeerStateType = EthPeerState,
26+
PeerStateType = Eth68PeerState,
2727
NetworkStateType = EthWireRef)
2828

2929
defineProtocol(PROTO = eth69,

0 commit comments

Comments
 (0)