perf(netsync): batch subtree writes during legacy catch-up#724
perf(netsync): batch subtree writes during legacy catch-up#724freemans13 wants to merge 31 commits into
Conversation
…tch-up writeSubtree previously issued one filestore write + fsync per subtree per block. During legacy sync this contributed to mainnet's I/O saturation (sdb 90% util bursts in profile captured 2026-04-17). Introduce a SubtreeWriteBatcher that is lazily constructed on first prepareSubtrees call in quickValidationMode. It accumulates subtree/subtreeData/subtreeMeta payloads and flushes on block count (default 8) or wall-clock interval (default 500ms). Flush-on-stop invariant preserves shutdown durability. Outside catch-up, writeSubtree falls through to writeSubtreeDirect with identical semantics. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🤖 Claude Code Review Status: Complete No critical issues found. The implementation is well-structured with comprehensive documentation, proper concurrency safety, thorough test coverage, and clear fail-fast error handling semantics. Key strengths:
History:
|
| treeFileType = fileformat.FileTypeSubtree | ||
| } | ||
|
|
||
| if err := sm.subtreeWriteBatcher.Submit(SubtreeWriteItem{Kind: SubtreeKindTree, FileType: treeFileType, RootHash: treeRootHash, Bytes: subtreeBytes, DeleteAt: dah, BlockHeight: block.Height()}); err != nil { |
There was a problem hiding this comment.
[Minor] Context propagation inconsistency
The batched write path on line 428 submits items that will later be flushed using context.Background() (see line 120 in Submit and line 171 in timerLoop), discarding the ctx parameter passed to writeSubtree.
In contrast, writeSubtreeDirect uses the provided context throughout (line 439), allowing cancellation and tracing to propagate correctly.
Impact: During catch-up, if the parent context is cancelled (e.g., shutdown signal), already-queued items will flush with context.Background() and ignore the cancellation. The Stop() call will still drain pending items, so this is primarily a tracing/observability gap rather than a correctness issue.
Suggestion: Consider passing ctx through to Submit and storing it with each item, or document this intentional behavior if the background context is preferred for durability guarantees.
There was a problem hiding this comment.
✅ Fixed in latest commit
The code now passes ctx to Submit() on lines 467-473, so the caller context propagates correctly for count-threshold flushes.
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-06-02 09:09 UTC |
…ability SubtreeWriteBatcher.Submit() flushes synchronously only when the buffer reaches its size threshold; otherwise items wait for the timer (default 500ms). The immediate caller, writeSubtree, returns to HandleBlockDirect right after the three Submit() calls, and HandleBlockDirect then invokes ProcessBlock. blockvalidation.CheckBlockSubtrees reads the subtree files from disk during validation. With the old default of 8 blocks, the first seven blocks of a catch-up burst queued without flushing. CheckBlockSubtrees looked for subtree files that were still only in memory, missed, and fell back to an HTTP fetch built from the "legacy" baseURL sentinel — which has no scheme and always fails. The block was rejected, the peer looked stalled, and the loop repeated. Default to 1 block per batch so each block's three items (tree + data + meta) cross the threshold on their third Submit(), flushing in-line before writeSubtree returns. The batcher code stays in place for future callers that can tolerate out-of-band durability; changing the default restores the contract that writeSubtree implies for its current sole caller. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`extendTransactions` previously issued one PreviousOutputsDecorate DB call
per transaction. On a 20,981-tx testnet block we observed this step take
3m 32s — 94% of total block-processing time — because the non-local-parent
path scaled linearly with block size: one IN-clause query per tx, serialised
only by the outpoint-batcher goroutine limit.
Split extendTransactions into two phases:
1. Same-block parents via txMap, in per-tx goroutines (unchanged behaviour,
waits for the parent tx to finish being extended). Pulled out into
`extendFromTxMap`.
2. Non-local parents via a single `BatchPreviousOutputsDecorate` call. The
batch skips inputs already populated by phase 1, so only still-unfilled
inputs hit the database. For a 20k-tx block that's one chunked query
instead of 20k.
On a batch-decoration error that resembles "parent pruned / missing", fall
back to the original per-tx path so the DAH'd-parent recovery
(`utxoStore.Get(txHash, fields.Tx)`) still applies.
Also convert `BatchPreviousOutputsDecorate` to use a composite
`(t.hash, o.idx)` IN predicate — previously flagged as a TODO in the file
because the bulk path wasn't on the legacy-sync hot path. Now that it is,
the composite predicate avoids scanning every output of every referenced
parent, which is the behaviour that made PreviousOutputsDecorate dominate
heap on data-carrier-heavy blocks.
Expected impact on testnet 20k-tx block: extendTransactions 3m 32s → <10s.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous implementation polled prevTxWrapper.Tx.IsExtended() on a 10ms tick before reading the parent's Outputs. That's unnecessary: Outputs are populated at wire-parse time and never mutated afterwards; IsExtended only checks whether the parent's *inputs* have PreviousTxScript set, which is unrelated to the parent's outputs that the child needs. The wait also introduced a deadlock under the new two-phase extendTransactions flow: a pure-non-local-parent tx only becomes "extended" after phase 2 (batch DB call). Phase 1 goroutines waiting for such a parent would time out at 120s and fail the block. Observed on testnet block 1630240. Drop the poll and read the parent's outputs directly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nto stu/netsync-subtree-write-batch
…ationMode The mode gate isn't really about validation speed — it's about the reader-side durability contract: CheckBlockSubtrees reads subtree files synchronously after writeSubtree returns and expects them on disk. quickValidationMode skips that reader, so the batcher can hold items in memory. The direct path also streams to filestorer while the batcher must materialise []byte, bounding memory risk to the (checkpoint-anchored) catch-up window. Expand the comment on the batcher construction site and the writeSubtree dispatcher to spell this out, since the rationale had been implicit in a buried comment inside the batcher branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR targets legacy catch-up performance by reducing subtree filestore write/fsync overhead during quickValidationMode, and also improves previous-output decoration efficiency during legacy block handling.
Changes:
- Added a
SubtreeWriteBatcherto batch subtree blob writes duringquickValidationMode, with new legacy settings controlling batch size and flush interval. - Wired the batcher into netsync (
prepareSubtrees/writeSubtree) with a concurrent flush callback, plus shutdown draining inSyncManager.Stop. - Refactored legacy
extendTransactionsinto a two-phase flow and updated SQL UTXO decoration to use composite(hash, idx)IN queries to avoid scanning unreferenced outputs.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
stores/utxo/sql/sql.go |
Changes batch decoration query to fetch only exact (txhash, outidx) pairs via composite IN. |
settings/settings.go |
Adds defaults for subtree write batching knobs. |
settings/legacy_settings.go |
Exposes new legacy settings keys/descriptions for subtree write batching. |
services/legacy/netsync/subtree_write_batcher.go |
Introduces the batching primitive used by netsync during catch-up. |
services/legacy/netsync/subtree_write_batcher_test.go |
Adds unit tests covering count/timer/stop flush behavior and error surfacing. |
services/legacy/netsync/manager.go |
Ensures batcher is drained on SyncManager shutdown. |
services/legacy/netsync/handle_block.go |
Integrates batcher into writeSubtree, adds flush callback, and refactors transaction extension to use batch decoration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if toFlush != nil { | ||
| return b.flushFn(context.Background(), toFlush) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Stop drains and shuts down. Returns the error from the final flush, if any. | ||
| func (b *SubtreeWriteBatcher) Stop(ctx context.Context) error { | ||
| if err := b.takeLastErr(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| b.mu.Lock() | ||
| if b.stopped { | ||
| b.mu.Unlock() | ||
| return nil | ||
| } | ||
| b.stopped = true | ||
| pending := b.buf | ||
| b.buf = nil | ||
| close(b.stopCh) | ||
| b.mu.Unlock() | ||
|
|
||
| b.wg.Wait() | ||
|
|
||
| if len(pending) > 0 { | ||
| return b.flushFn(ctx, pending) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Stop() does not wait for a count-threshold flush that was triggered by Submit() just before Stop acquired the mutex. Because Submit() performs flushFn() outside the batcher goroutine and is not tracked by b.wg, Stop() can return before that in-flight flush completes, violating the documented “all pending items flushed before Stop returns” contract and risking data loss on shutdown/mode transitions. Consider tracking all flushFn invocations with a dedicated WaitGroup (increment before calling flushFn in Submit/timerLoop and decrement after) and waiting for it in Stop() before returning.
There was a problem hiding this comment.
Fixed. Added a dedicated inflight WaitGroup that covers every flushFn invocation (Submit count-threshold flushes, timerLoop, and the final Stop-path flush). The slot is reserved under the same mutex that gates stopped/buf so a Submit flush racing Stop cannot slip past. Stop now blocks on inflight.Wait() in addition to the timer goroutine wg before returning.
| // Submit queues one write. May trigger a synchronous flush on count threshold. | ||
| func (b *SubtreeWriteBatcher) Submit(item SubtreeWriteItem) error { | ||
| if err := b.takeLastErr(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| b.mu.Lock() | ||
| if b.stopped { | ||
| b.mu.Unlock() | ||
| return errors.NewProcessingError("SubtreeWriteBatcher: submit after stop") | ||
| } | ||
| if len(b.buf) == 0 { | ||
| b.oldest = time.Now() | ||
| } | ||
| b.buf = append(b.buf, item) | ||
| shouldFlush := len(b.buf) >= b.maxItems | ||
| var toFlush []SubtreeWriteItem | ||
| if shouldFlush { | ||
| toFlush = b.buf | ||
| b.buf = nil | ||
| } | ||
| b.mu.Unlock() | ||
|
|
||
| if toFlush != nil { | ||
| return b.flushFn(context.Background(), toFlush) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Stop drains and shuts down. Returns the error from the final flush, if any. | ||
| func (b *SubtreeWriteBatcher) Stop(ctx context.Context) error { | ||
| if err := b.takeLastErr(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| b.mu.Lock() | ||
| if b.stopped { | ||
| b.mu.Unlock() | ||
| return nil | ||
| } | ||
| b.stopped = true | ||
| pending := b.buf | ||
| b.buf = nil | ||
| close(b.stopCh) | ||
| b.mu.Unlock() | ||
|
|
||
| b.wg.Wait() | ||
|
|
||
| if len(pending) > 0 { | ||
| return b.flushFn(ctx, pending) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (b *SubtreeWriteBatcher) timerLoop() { | ||
| defer b.wg.Done() | ||
| ticker := time.NewTicker(b.maxWait / 2) | ||
| defer ticker.Stop() | ||
| for { | ||
| select { | ||
| case <-b.stopCh: | ||
| return | ||
| case <-ticker.C: | ||
| b.mu.Lock() | ||
| if len(b.buf) == 0 { | ||
| b.mu.Unlock() | ||
| continue | ||
| } | ||
| if time.Since(b.oldest) < b.maxWait { | ||
| b.mu.Unlock() | ||
| continue | ||
| } | ||
| toFlush := b.buf | ||
| b.buf = nil | ||
| b.mu.Unlock() | ||
| if err := b.flushFn(context.Background(), toFlush); err != nil { | ||
| if b.logger != nil { |
There was a problem hiding this comment.
Submit()/timerLoop call flushFn with context.Background(), which drops cancellation/timeouts and any tracing context from the caller. In particular, writeSubtree() passes a ctx that may be cancelled on shutdown or block-processing abort, but count-based flushes will ignore it. Consider changing Submit to accept a context (or storing a context in the batcher) and passing that through to flushFn so flushes can be cancelled and can participate in tracing/log scoping.
There was a problem hiding this comment.
Fixed. Submit now takes a ctx context.Context parameter which is forwarded to flushFn for count-threshold flushes. writeSubtree passes the caller ctx through, so a cancellation on shutdown or block-processing abort now propagates. Timer-path flushes continue to use context.Background() since they have no caller ctx to inherit, and errors still surface via lastErr.
| subtreeData *subtreepkg.Data, subtreeMetaData *subtreepkg.Meta, quickValidationMode bool) error { | ||
| if !quickValidationMode && sm.subtreeWriteBatcher != nil { | ||
| if err := sm.subtreeWriteBatcher.Stop(ctx); err != nil { | ||
| sm.logger.Errorf("[writeSubtree] draining SubtreeWriteBatcher on quickValidationMode transition: %v", err) |
There was a problem hiding this comment.
On quickValidationMode transition, batcher.Stop(ctx) errors are only logged and then the code proceeds to the direct write path. If Stop fails, there may be buffered subtree items that never made it to disk, and subsequent validation/readers will see missing blobs. Consider returning the Stop error (or wrapping it) so block processing fails fast rather than continuing with a partially persisted subtree set.
| sm.logger.Errorf("[writeSubtree] draining SubtreeWriteBatcher on quickValidationMode transition: %v", err) | |
| return fmt.Errorf("[writeSubtree] draining SubtreeWriteBatcher on quickValidationMode transition: %w", err) |
There was a problem hiding this comment.
Fixed. writeSubtree now returns the wrapped Stop error instead of falling through to the direct path, so block processing fails fast if the drain failed. Using errors.NewStorageError so callers can classify the failure the same way as other blob-store errors.
| g.Go(func() error { | ||
| txWrapper.SomeParentsInBlock = true | ||
|
|
There was a problem hiding this comment.
txWrapper.SomeParentsInBlock is written from multiple per-input goroutines without synchronization, which will trigger a data race under -race and can lead to undefined behavior. Set this flag before launching the goroutine (when you detect a same-block parent), or protect it with a mutex/atomic.
| g.Go(func() error { | |
| txWrapper.SomeParentsInBlock = true | |
| txWrapper.SomeParentsInBlock = true | |
| g.Go(func() error { |
There was a problem hiding this comment.
Fixed. Moved txWrapper.SomeParentsInBlock = true outside the goroutine so the write happens on the single loop goroutine rather than the per-input errgroup workers. All writers set the same value, but an unsynchronised concurrent write is still a data race under -race.
| // Let the timer fire and the flush error to be captured | ||
| time.Sleep(300 * time.Millisecond) | ||
|
|
||
| // Next Submit should surface the captured error | ||
| err := b.Submit(SubtreeWriteItem{Kind: SubtreeKindData}) | ||
| require.ErrorIs(t, err, boom) |
There was a problem hiding this comment.
This test uses a fixed time.Sleep to wait for the timer flush to run, which can be flaky or unnecessarily slow under variable CI load. Prefer require.Eventually (or a signal/channel from flushFn) to deterministically wait until the timer-path flush has executed and its error has been captured.
There was a problem hiding this comment.
Fixed. Replaced the fixed time.Sleep with a flushed channel signalled from inside the flush callback, plus a require.Eventually on the observable Submit-returns-boom condition. This removes the fixed-timeout dependency while still tolerating the small window between the flush signal and lastErr being published.
| // Phase 2: for inputs whose parents are NOT same-block, batch the decoration into | ||
| // a single IN-clause DB query per chunk instead of issuing one per tx. For a | ||
| // 20k-tx block this collapses ~20k round-trips into O(N / maxINClauseSize). | ||
| // | ||
| // BatchPreviousOutputsDecorate skips inputs that already have PreviousTxScript set, | ||
| // so Phase 1's work is preserved. If it returns a processing/not-found error the | ||
| // most likely cause is a parent that's been pruned (DAH'd) because the child | ||
| // already had a prior processing pass. Fall back to per-tx decoration so the | ||
| // existing recovery path (utxoStore.Get on the child itself) can still kick in. | ||
| if batchErr := sm.utxoStore.BatchPreviousOutputsDecorate(ctx, txs); batchErr != nil { | ||
| if errors.Is(batchErr, errors.ErrProcessing) || errors.Is(batchErr, errors.ErrTxNotFound) { | ||
| return sm.extendPerTxFallback(ctx, txs) | ||
| } | ||
| return errors.NewProcessingError("failed to batch-decorate previous outputs", batchErr) |
There was a problem hiding this comment.
This PR introduces substantial additional behavior beyond subtree write batching (two-phase transaction extension plus new SQL query strategy in BatchPreviousOutputsDecorate). The PR title/description currently focus on subtree batching; consider updating them to explicitly mention the previous-output batching/extendTransactions changes (or splitting into a separate PR) to make review/rollback scope clearer.
There was a problem hiding this comment.
Acknowledged. The previous-outputs batch-decoration / two-phase extend work is logically coupled to the subtree write batching (both are catch-up throughput optimisations that only activate in quickValidationMode), but the title does understate the scope. I will update the PR title/description to call out both changes so the review/rollback unit is explicit.
| // Stop drains and shuts down. Returns the error from the final flush, if any. | ||
| func (b *SubtreeWriteBatcher) Stop(ctx context.Context) error { | ||
| if err := b.takeLastErr(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| b.mu.Lock() | ||
| if b.stopped { | ||
| b.mu.Unlock() | ||
| return nil | ||
| } | ||
| b.stopped = true | ||
| pending := b.buf | ||
| b.buf = nil | ||
| close(b.stopCh) | ||
| b.mu.Unlock() | ||
|
|
||
| b.wg.Wait() | ||
|
|
||
| if len(pending) > 0 { | ||
| return b.flushFn(ctx, pending) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Stop() calls takeLastErr() only at the beginning. If a timer-path flush is in progress while Stop is called, it can fail and set b.lastErr after the initial takeLastErr, and then Stop will return nil (or only the pending-flush error) and the timer error will never be surfaced because the batcher is stopped. Consider calling takeLastErr() again after b.wg.Wait() (and possibly after the final pending flush) and returning/combining that error so timer flush failures aren’t lost during shutdown.
There was a problem hiding this comment.
Fixed. Stop now re-reads takeLastErr() after wg.Wait() and the final pending flush, so timer-path errors captured during shutdown are surfaced. If the final pending flush itself fails, that error takes precedence (it is the batch the caller most directly relied on being persisted) and any queued timer error is cleared so it is not returned by a future Stop call.
- SubtreeWriteBatcher: add inflight WaitGroup so Stop() waits for count-threshold flushes started by Submit() before returning - SubtreeWriteBatcher: Submit() now takes ctx and forwards it to flushFn so caller cancellation/tracing propagates on count-threshold flushes - SubtreeWriteBatcher: Stop() re-reads takeLastErr() after final pending flush so timer-path errors during shutdown are surfaced - handle_block: return wrapped error from writeSubtree when Stop fails on quickValidationMode transition instead of falling through to the direct path (fail fast on missing blobs) - handle_block: move txWrapper.SomeParentsInBlock = true outside the errgroup goroutine to eliminate -race data race - subtree_write_batcher_test: replace fixed time.Sleep with a signal channel + require.Eventually for deterministic timer-flush wait Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| } | ||
|
|
||
| // extendFromTxMap populates a transaction's inputs whose parents are in the same | ||
| // block (available via txMap). It waits up to 120 seconds for each same-block parent |
There was a problem hiding this comment.
[Minor] Documentation inaccuracy
The comment says "It waits up to 120 seconds for each same-block parent" but the implementation does not wait at all. Looking at lines 1138-1141 below, the code directly reads parent outputs without any polling or timeout:
// Parent's Outputs are populated at wire-parse time and never mutated
// afterwards, so we can read them immediately without waiting for the
// parent tx itself to finish being extended.Suggested fix: Remove the "waits up to 120 seconds" phrase. For example:
// extendFromTxMap populates a transaction's inputs whose parents are in the same
// block (available via txMap). Parent outputs are read directly from the txMap
// without waiting, as they are populated at wire-parse time. Child and parent may be
// processed by different goroutines in the enclosing errgroup.
There was a problem hiding this comment.
✅ Fixed in latest commit
The docstring at lines 1090-1098 has been corrected and no longer mentions waiting. It accurately describes the immediate read behavior.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tx := txWrapper.Tx | ||
| txs = append(txs, tx) | ||
|
|
||
| g.Go(func() error { | ||
| if err := sm.extendFromTxMap(gCtx, tx, txMap); err != nil { | ||
| return errors.NewTxError("failed to extend transaction from txMap", err) | ||
| } | ||
| return nil | ||
| }) |
There was a problem hiding this comment.
The goroutine launched in this loop captures the tx variable from the surrounding loop scope. Since tx is reassigned each iteration, all goroutines may end up calling extendFromTxMap on the last transaction (classic closure-capture bug), leading to incorrect/missing input decoration and potential data races. Capture the per-iteration tx in a new variable (e.g., txLocal := txWrapper.Tx) and have the closure reference that (or pass it as an argument) before calling g.Go.
There was a problem hiding this comment.
Not a bug — tx := txWrapper.Tx is declared inside the loop body with :=, so it is a fresh variable in each iteration (block-scoped, not the loop header), and each g.Go closure captures its own tx. This is independent of the Go 1.22 per-iteration-loop-variable change. Additionally this module is on Go 1.26, where even the loop-header variables are per-iteration. The existing txs = append(txs, tx) call on the preceding line would also be broken if the closure capture were wrong, but phase 2 consumes txs correctly in the tests.
| // block (available via txMap). It waits up to 120 seconds for each same-block parent | ||
| // to be fully extended itself, which is necessary because child and parent may be | ||
| // processed by different goroutines in the enclosing errgroup. |
There was a problem hiding this comment.
The doc comment for extendFromTxMap says it “waits up to 120 seconds for each same-block parent to be fully extended”, but the implementation no longer waits (and explicitly states waiting is unnecessary). Please update the comment to match the current behavior to avoid misleading future maintainers.
| // block (available via txMap). It waits up to 120 seconds for each same-block parent | |
| // to be fully extended itself, which is necessary because child and parent may be | |
| // processed by different goroutines in the enclosing errgroup. | |
| // block (available via txMap). Same-block parent outputs are read directly from | |
| // txMap without waiting for the parent transaction to become fully extended, | |
| // because outputs are populated at wire-parse time and are not mutated during | |
| // input extension. |
There was a problem hiding this comment.
Fixed — updated the extendFromTxMap doc comment to state that same-block parent Outputs are read directly without waiting, with a brief explanation of why the read is safe (Outputs are populated at wire-parse time and not mutated during input extension). Dropped the stale "waits up to 120 seconds" language.
| // Phase 2: for inputs whose parents are NOT same-block, batch the decoration into | ||
| // a single IN-clause DB query per chunk instead of issuing one per tx. For a | ||
| // 20k-tx block this collapses ~20k round-trips into O(N / maxINClauseSize). | ||
| // | ||
| // BatchPreviousOutputsDecorate skips inputs that already have PreviousTxScript set, | ||
| // so Phase 1's work is preserved. If it returns a processing/not-found error the | ||
| // most likely cause is a parent that's been pruned (DAH'd) because the child | ||
| // already had a prior processing pass. Fall back to per-tx decoration so the | ||
| // existing recovery path (utxoStore.Get on the child itself) can still kick in. | ||
| if batchErr := sm.utxoStore.BatchPreviousOutputsDecorate(ctx, txs); batchErr != nil { | ||
| if errors.Is(batchErr, errors.ErrProcessing) || errors.Is(batchErr, errors.ErrTxNotFound) { | ||
| return sm.extendPerTxFallback(ctx, txs) | ||
| } | ||
| return errors.NewProcessingError("failed to batch-decorate previous outputs", batchErr) |
There was a problem hiding this comment.
This function now always runs the two-phase + BatchPreviousOutputsDecorate path, but the PR description says the behavior is gated on quickValidationMode and that non-quick blocks should fall back to the original per-tx PreviousOutputsDecorate loop. Since extendTransactions is called before quickValidationMode is computed in prepareSubtrees, this change currently applies in all modes. If the gating is required, pass quickValidationMode into extendTransactions (or move the call until after it’s determined) and keep the legacy per-tx decoration for non-quick blocks.
There was a problem hiding this comment.
Correct observation — the PR description was misleading on this point. The two-phase + BatchPreviousOutputsDecorate path is intentionally not gated on quickValidationMode: it is a functional-equivalent refactor of the same decoration step, applied to every legacy-sync block. BatchPreviousOutputsDecorate skips inputs that phase 1 already populated and returns the same per-input result the old per-tx loop produced, just via a single batched query; on a missing-parent / processing error it falls back to the original extendPerTxFallback path so the DAH-recovery via utxoStore.Get(fields.Tx) still applies. The quickValidationMode gating claim in the earlier description draft was wrong — I have rewritten the PR description to make the subtree-batcher-only gating explicit and explain why the decoration refactor is safe to apply universally.
- extendFromTxMap: update stale doc comment that claimed the function still waits up to 120 seconds for parents to be extended. It no longer does — outputs are read directly from txMap because parent Outputs are populated at wire-parse time and not mutated during input extension. Also updated the PR description to remove the incorrect claim that the two-phase BatchPreviousOutputsDecorate path is gated on quickValidationMode (it is not — it is a functional-equivalent refactor applied to every legacy-sync block, with the per-tx fallback preserving DAH'd-parent recovery). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tx.Inputs[i].PreviousTxSatoshis = prevTxWrapper.Tx.Outputs[input.PreviousTxOutIndex].Satoshis | ||
| tx.Inputs[i].PreviousTxScript = bscript.NewFromBytes(*prevTxWrapper.Tx.Outputs[input.PreviousTxOutIndex].LockingScript) |
There was a problem hiding this comment.
extendFromTxMap indexes prevTxWrapper.Tx.Outputs[input.PreviousTxOutIndex] without validating that the referenced output index exists. A malformed/invalid block could cause an out-of-range panic here before normal validation has a chance to return a structured error. Add a bounds check on input.PreviousTxOutIndex against len(prevTxWrapper.Tx.Outputs) and return a processing/tx error when the outpoint is invalid.
There was a problem hiding this comment.
Fixed. Added an explicit bounds check on input.PreviousTxOutIndex before indexing into prevTxWrapper.Tx.Outputs; returns an errors.NewTxError with the offending tx, input index, outpoint index, and parent hash if the outpoint is out of range. The error propagates through the errgroup back up to extendTransactions, which wraps it as a processing error so the block is rejected via the normal error path rather than panicking mid-goroutine.
| if sm.subtreeWriteBatcher != nil { | ||
| if err := sm.subtreeWriteBatcher.Stop(context.Background()); err != nil { | ||
| sm.logger.Errorf("[SyncManager] SubtreeWriteBatcher.Stop: %v", err) | ||
| } | ||
| sm.subtreeWriteBatcher = nil | ||
| } | ||
|
|
||
| return nil |
There was a problem hiding this comment.
SyncManager.Stop() logs and discards errors returned by SubtreeWriteBatcher.Stop(), but Stop() itself returns an error. If the drain fails, this can silently lose buffered subtree writes during shutdown. Consider propagating the error (or aggregating it with other shutdown errors) so callers can detect an unclean shutdown.
| if sm.subtreeWriteBatcher != nil { | |
| if err := sm.subtreeWriteBatcher.Stop(context.Background()); err != nil { | |
| sm.logger.Errorf("[SyncManager] SubtreeWriteBatcher.Stop: %v", err) | |
| } | |
| sm.subtreeWriteBatcher = nil | |
| } | |
| return nil | |
| var stopErr error | |
| if sm.subtreeWriteBatcher != nil { | |
| if err := sm.subtreeWriteBatcher.Stop(context.Background()); err != nil { | |
| sm.logger.Errorf("[SyncManager] SubtreeWriteBatcher.Stop: %v", err) | |
| stopErr = err | |
| } | |
| sm.subtreeWriteBatcher = nil | |
| } | |
| return stopErr |
There was a problem hiding this comment.
Fixed. SyncManager.Stop() now captures the drain error into stopErr (in addition to logging it) and returns that from Stop so callers can detect an unclean shutdown. Used stopErr rather than a multi-error aggregator because this is currently the only shutdown-path operation that returns an error; we can promote to a multi-error pattern later if more sources emerge.
- extendFromTxMap: bounds-check input.PreviousTxOutIndex against len(prevTxWrapper.Tx.Outputs) before indexing, and return a structured TxError rather than panicking on a malformed block. - manager.Stop: capture and propagate SubtreeWriteBatcher.Stop() error to the caller so an unclean shutdown (buffered subtree writes lost) is observable instead of silently logged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // worst-case allocation — subtreeData.Serialize() alone can produce a | ||
| // multi-GB buffer on the largest historical blocks. flushSubtreeWriteBatch | ||
| // is still defensive against ErrBlobAlreadyExists inside NewFileStorer, but | ||
| // catching it here avoids the allocation entirely. | ||
| treeExists, _ := sm.subtreeStore.Exists(ctx, treeRootHash[:], treeFileType) | ||
| dataExists, _ := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeData) | ||
| metaExists, _ := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeMeta) |
There was a problem hiding this comment.
The Exists() calls here intentionally skip serialization when blobs are already present, but they currently ignore any Exists() error. If Exists fails (e.g., store unavailable), the code will proceed to Serialize() and Submit anyway, which can trigger the very multi-GB allocations this guard is meant to avoid and also hides the underlying storage failure. Capture and propagate the Exists error (wrap as a storage/processing error) rather than discarding it.
| // worst-case allocation — subtreeData.Serialize() alone can produce a | |
| // multi-GB buffer on the largest historical blocks. flushSubtreeWriteBatch | |
| // is still defensive against ErrBlobAlreadyExists inside NewFileStorer, but | |
| // catching it here avoids the allocation entirely. | |
| treeExists, _ := sm.subtreeStore.Exists(ctx, treeRootHash[:], treeFileType) | |
| dataExists, _ := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeData) | |
| metaExists, _ := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeMeta) | |
| // blocks, skipping Serialize() on blobs that are already on disk saving the | |
| // worst-case allocation — subtreeData.Serialize() alone can produce a | |
| // multi-GB buffer on the largest historical blocks. flushSubtreeWriteBatch | |
| // is still defensive against ErrBlobAlreadyExists inside NewFileStorer, but | |
| // catching it here avoids the allocation entirely. | |
| treeExists, err := sm.subtreeStore.Exists(ctx, treeRootHash[:], treeFileType) | |
| if err != nil { | |
| return errors.NewStorageError("[writeSubtree] check subtree subtree exists", err) | |
| } | |
| dataExists, err := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeData) | |
| if err != nil { | |
| return errors.NewStorageError("[writeSubtree] check subtree data exists", err) | |
| } | |
| metaExists, err := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeMeta) | |
| if err != nil { | |
| return errors.NewStorageError("[writeSubtree] check subtree meta exists", err) | |
| } |
There was a problem hiding this comment.
Fixed. All three Exists() calls in the batched writeSubtree path now propagate errors as [writeSubtree] check subtree/data/meta exists storage errors rather than swallowing them — a store failure now aborts the block before any Serialize() allocation instead of silently proceeding.
| case SubtreeKindMeta: | ||
| // Mirror the existence check from writeSubtreeDirect: skip if already present | ||
| // (e.g., created by block assembly via P2P). | ||
| if exists, _ := sm.subtreeStore.Exists(gCtx, item.RootHash[:], fileformat.FileTypeSubtreeMeta); exists { |
There was a problem hiding this comment.
This meta existence check also discards the error return from Exists(). If Exists fails, the flush will proceed as if the blob is absent, potentially doing unnecessary work and masking a store failure until later. Prefer handling the error explicitly (return a storage/processing error) so the batch flush fails with the real root cause.
| if exists, _ := sm.subtreeStore.Exists(gCtx, item.RootHash[:], fileformat.FileTypeSubtreeMeta); exists { | |
| exists, err := sm.subtreeStore.Exists(gCtx, item.RootHash[:], fileformat.FileTypeSubtreeMeta) | |
| if err != nil { | |
| return errors.NewStorageError("flushSubtreeWriteBatch: check meta exists", err) | |
| } | |
| if exists { |
There was a problem hiding this comment.
Fixed. flushSubtreeWriteBatch now propagates the Exists() error for the meta existence check as a storage error instead of discarding it.
| // are dropped from the buffer and the error is surfaced to the caller via one of: | ||
| // - Submit() returns the error directly (count-threshold path). | ||
| // - The next Submit() or Stop() returns lastErr (timer path). | ||
| // - Stop() returns the final-flush error and/or lastErr. |
There was a problem hiding this comment.
The comment bullet says "Stop() returns the final-flush error and/or lastErr", but Stop currently returns either finalErr (preferred) or lastErr (and clears lastErr when returning finalErr). Update this comment to match the actual semantics (it never returns both errors).
| // - Stop() returns the final-flush error and/or lastErr. | |
| // - Stop() returns the final-flush error if one occurs; otherwise it returns lastErr. |
There was a problem hiding this comment.
Fixed. Updated the bullet to match the actual code: Stop() returns the final-flush error if one occurs, otherwise it returns the captured lastErr (and clears it).
- Propagate Exists() errors from writeSubtree's pre-serialization checks and from flushSubtreeWriteBatch's meta short-circuit, instead of silently proceeding on store failures. - Correct SubtreeWriteBatcher type-doc bullet to match Stop()'s actual semantics (final-flush error OR lastErr, never both). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (sm *SyncManager) flushSubtreeWriteBatch(ctx context.Context, items []SubtreeWriteItem) error { | ||
| g, gCtx := errgroup.WithContext(ctx) | ||
| util.SafeSetLimit(g, 8) | ||
|
|
There was a problem hiding this comment.
flushSubtreeWriteBatch uses errgroup.WithContext(ctx), which cancels gCtx on the first item error. That can cause other item writes in the same batch to fail with context cancellation (and potentially not be attempted), which conflicts with the batcher’s doc that a single item failure should not prevent processing the rest of the batch. Consider using an errgroup.Group without a derived cancellation context (or otherwise ensure one item failure doesn’t cancel siblings) while still honoring the caller ctx for shutdown/timeouts.
There was a problem hiding this comment.
Addressed by clarifying the doc. The fail-fast cancellation of sibling writes is intentional and consistent with the batcher's block-level fail-fast semantics (documented under 'Flush failure semantics'): any flush error aborts the enclosing block, and the operator restarts catch-up — we never want to silently persist a partial set of per-block blobs. Tightened the SubtreeWriteFlushFunc doc to say implementations must not silently skip items (i.e. every item must be attempted or explicitly accounted for), but fail-fast cancellation on first error is acceptable and desirable here.
| if err != nil { | ||
| if errors.Is(err, errors.ErrBlobAlreadyExists) { | ||
| return nil | ||
| } | ||
| return errors.NewStorageError("flushSubtreeWriteBatch: create file", err) | ||
| } | ||
| var ok bool | ||
| defer func() { | ||
| if !ok { | ||
| storer.Abort(errors.NewProcessingError("flushSubtreeWriteBatch: write aborted")) | ||
| } | ||
| }() | ||
| if _, err := storer.Write(item.Bytes); err != nil { | ||
| return errors.NewStorageError("flushSubtreeWriteBatch: write", err) | ||
| } | ||
| if err := storer.Close(gCtx); err != nil { | ||
| return errors.NewStorageError("flushSubtreeWriteBatch: close", err) | ||
| } |
There was a problem hiding this comment.
Errors returned from flushSubtreeWriteBatch wrap the underlying error but omit the item identifiers (kind/root hash/file type). This will make it hard to diagnose which blob failed in production when a batch flush fails. Consider including at least item.Kind and item.RootHash (and resolved fileType) in the StorageError messages emitted from this flush path.
There was a problem hiding this comment.
Fixed. Every error returned from flushSubtreeWriteBatch now includes the failing item's kind=<n> type=<fileType> hash=<rootHash> identifiers (rootHash is formatted once per item). Operators can grep logs by root hash to locate the exact blob without cross-referencing batch offsets.
| func NewSubtreeWriteBatcher(maxBlocks int, maxWait time.Duration, logger batcherLogger, flushFn SubtreeWriteFlushFunc) *SubtreeWriteBatcher { | ||
| if maxBlocks < 1 { | ||
| maxBlocks = 1 | ||
| } | ||
| if maxWait < 10*time.Millisecond { | ||
| maxWait = 10 * time.Millisecond | ||
| } | ||
| b := &SubtreeWriteBatcher{ | ||
| maxItems: maxBlocks * 3, | ||
| maxWait: maxWait, | ||
| flushFn: flushFn, | ||
| logger: logger, | ||
| stopCh: make(chan struct{}), | ||
| } |
There was a problem hiding this comment.
NewSubtreeWriteBatcher stores flushFn without validating it. If a caller accidentally passes a nil flushFn, the first flush will panic when b.flushFn is invoked. Consider defensively panicking early with a clear message or otherwise preventing construction with a nil flushFn.
There was a problem hiding this comment.
Fixed. NewSubtreeWriteBatcher now panics at construction when flushFn is nil, with a message noting this is a programming error (no sensible default for 'discard writes').
- NewSubtreeWriteBatcher panics on nil flushFn at construction to fail loudly rather than at first flush with a nil pointer dereference. - flushSubtreeWriteBatch error messages now include kind/type/rootHash identifiers so operators can trace the failing blob from logs. - Clarify SubtreeWriteFlushFunc doc: implementations must not silently skip items, but fail-fast cancellation of sibling writes on first error is acceptable (matches the block-level fail-fast semantics). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return errors.NewStorageError("[writeSubtree] check subtree exists", err) | ||
| } | ||
| dataExists, err := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeData) | ||
| if err != nil { | ||
| return errors.NewStorageError("[writeSubtree] check subtree data exists", err) | ||
| } | ||
| metaExists, err := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeMeta) | ||
| if err != nil { | ||
| return errors.NewStorageError("[writeSubtree] check subtree meta exists", err) | ||
| } | ||
|
|
||
| if !treeExists { | ||
| subtreeBytes, err := subtree.Serialize() | ||
| if err != nil { | ||
| return errors.NewStorageError("[writeSubtree] serialize subtree", err) | ||
| } | ||
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindTree, FileType: treeFileType, RootHash: treeRootHash, Bytes: subtreeBytes, DeleteAt: dah}); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| if !dataExists { | ||
| dataBytes, err := subtreeData.Serialize() | ||
| if err != nil { | ||
| return errors.NewStorageError("[writeSubtree] serialize subtree data", err) | ||
| } | ||
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindData, RootHash: dataRootHash, Bytes: dataBytes, DeleteAt: dah}); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| if !metaExists { | ||
| metaBytes, err := subtreeMetaData.Serialize() | ||
| if err != nil { | ||
| return errors.NewStorageError("[writeSubtree] serialize subtree meta", err) | ||
| } | ||
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindMeta, RootHash: dataRootHash, Bytes: metaBytes, DeleteAt: dah}); err != nil { | ||
| return err |
There was a problem hiding this comment.
In the batched path, several new errors lose the useful per-subtree context that the direct path includes (e.g., subtree root hash / block hash+height). For example, the serialize/existence-check failures here return generic messages like "[writeSubtree] serialize subtree data" without identifying which subtree/block triggered it, making production diagnosis harder. Consider including at least subtree.RootHash() (and ideally block hash/height and item kind/fileType) in these error messages, consistent with writeSubtreeDirect’s error formatting.
| return errors.NewStorageError("[writeSubtree] check subtree exists", err) | |
| } | |
| dataExists, err := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeData) | |
| if err != nil { | |
| return errors.NewStorageError("[writeSubtree] check subtree data exists", err) | |
| } | |
| metaExists, err := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeMeta) | |
| if err != nil { | |
| return errors.NewStorageError("[writeSubtree] check subtree meta exists", err) | |
| } | |
| if !treeExists { | |
| subtreeBytes, err := subtree.Serialize() | |
| if err != nil { | |
| return errors.NewStorageError("[writeSubtree] serialize subtree", err) | |
| } | |
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindTree, FileType: treeFileType, RootHash: treeRootHash, Bytes: subtreeBytes, DeleteAt: dah}); err != nil { | |
| return err | |
| } | |
| } | |
| if !dataExists { | |
| dataBytes, err := subtreeData.Serialize() | |
| if err != nil { | |
| return errors.NewStorageError("[writeSubtree] serialize subtree data", err) | |
| } | |
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindData, RootHash: dataRootHash, Bytes: dataBytes, DeleteAt: dah}); err != nil { | |
| return err | |
| } | |
| } | |
| if !metaExists { | |
| metaBytes, err := subtreeMetaData.Serialize() | |
| if err != nil { | |
| return errors.NewStorageError("[writeSubtree] serialize subtree meta", err) | |
| } | |
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindMeta, RootHash: dataRootHash, Bytes: metaBytes, DeleteAt: dah}); err != nil { | |
| return err | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] check subtree exists: block_hash=%s block_height=%d subtree_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), subtree.RootHash().String(), SubtreeKindTree, treeFileType), err) | |
| } | |
| dataExists, err := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeData) | |
| if err != nil { | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] check subtree data exists: block_hash=%s block_height=%d subtree_root=%s data_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), subtree.RootHash().String(), dataRootHash.String(), SubtreeKindData, fileformat.FileTypeSubtreeData), err) | |
| } | |
| metaExists, err := sm.subtreeStore.Exists(ctx, dataRootHash[:], fileformat.FileTypeSubtreeMeta) | |
| if err != nil { | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] check subtree meta exists: block_hash=%s block_height=%d subtree_root=%s data_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), subtree.RootHash().String(), dataRootHash.String(), SubtreeKindMeta, fileformat.FileTypeSubtreeMeta), err) | |
| } | |
| if !treeExists { | |
| subtreeBytes, err := subtree.Serialize() | |
| if err != nil { | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] serialize subtree: block_hash=%s block_height=%d subtree_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), subtree.RootHash().String(), SubtreeKindTree, treeFileType), err) | |
| } | |
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindTree, FileType: treeFileType, RootHash: treeRootHash, Bytes: subtreeBytes, DeleteAt: dah}); err != nil { | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] submit subtree write: block_hash=%s block_height=%d subtree_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), treeRootHash.String(), SubtreeKindTree, treeFileType), err) | |
| } | |
| } | |
| if !dataExists { | |
| dataBytes, err := subtreeData.Serialize() | |
| if err != nil { | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] serialize subtree data: block_hash=%s block_height=%d subtree_root=%s data_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), subtree.RootHash().String(), dataRootHash.String(), SubtreeKindData, fileformat.FileTypeSubtreeData), err) | |
| } | |
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindData, RootHash: dataRootHash, Bytes: dataBytes, DeleteAt: dah}); err != nil { | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] submit subtree data write: block_hash=%s block_height=%d subtree_root=%s data_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), subtree.RootHash().String(), dataRootHash.String(), SubtreeKindData, fileformat.FileTypeSubtreeData), err) | |
| } | |
| } | |
| if !metaExists { | |
| metaBytes, err := subtreeMetaData.Serialize() | |
| if err != nil { | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] serialize subtree meta: block_hash=%s block_height=%d subtree_root=%s data_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), subtree.RootHash().String(), dataRootHash.String(), SubtreeKindMeta, fileformat.FileTypeSubtreeMeta), err) | |
| } | |
| if err := sm.subtreeWriteBatcher.Submit(ctx, SubtreeWriteItem{Kind: SubtreeKindMeta, RootHash: dataRootHash, Bytes: metaBytes, DeleteAt: dah}); err != nil { | |
| return errors.NewStorageError(fmt.Sprintf("[writeSubtree] submit subtree meta write: block_hash=%s block_height=%d subtree_root=%s data_root=%s kind=%s file_type=%s", | |
| block.Hash().String(), block.Height(), subtree.RootHash().String(), dataRootHash.String(), SubtreeKindMeta, fileformat.FileTypeSubtreeMeta), err) |
There was a problem hiding this comment.
Fixed. All batched-path errors in writeSubtree now include block_hash, block_height, subtree_root, and (where applicable) data_root/file_type identifiers — matching the formatting in writeSubtreeDirect. Hoisted the formatted strings to locals so each error line stays readable while preserving the context.
Batched-path writeSubtree errors now include block_hash, block_height, subtree_root, and (where relevant) data_root/file_type identifiers so operators can trace a failing blob from a single log line, matching writeSubtreeDirect's error formatting. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Adapted SubtreeWriteBatcher integration to upstream's multi-subtree prepareSubtrees: dropped the now-duplicate inline body and reinserted the batcher construction block before the writeSubtree loop.
oskarszoon
left a comment
There was a problem hiding this comment.
Approve. Conservative default, drain on mode-transition and Stop, replay-safe via blob existence checks. No races.
Before flipping batchBlocks > 1 in prod:
- Bench number missing. ~10.9 GB subtreeData ×
batchBlocks=10× 3 blob types = ~100 GB peak buffer on big historical blocks. Operator-acknowledged, just stating the bound. - One before/after data point on blocks/s would lock the perf claim in.
| // last valid block. See the type-level "Flush failure semantics" doc for | ||
| // why requeueing is worse than dropping here. | ||
| return b.flushFn(ctx, toFlush) | ||
| } |
There was a problem hiding this comment.
[Critical] This is a design-level concern for multi-block batches.
The documented rationale (lines 73-83) argues that requeueing would "conflate rejected block writes with next block writes" and is worse than fail-fast. However, this conflates two scenarios:
- Permanent errors (bad data, logic bugs) — fail-fast is correct; the block is invalid
- Transient errors (disk full, EAGAIN, network timeout on remote blob store) — dropping the batch forces a full catch-up restart from the last checkpoint, discarding already-materialized work
For multi-block batches (batchBlocks>1), a single transient error can lose writes from multiple blocks that have already been successfully serialized and validated.
Suggested mitigation: Add a retry loop (max 3 attempts with backoff) for known-transient error types before dropping the batch, or implement a quarantine mechanism that preserves the failed batch for operator inspection/manual retry.
|
|
Closing: the core premise doesn't hold. The batcher writes the identical set of files/fsyncs/bytes to disk as the direct path (no coalescing — flushSubtreeWriteBatch is one NewFileStorer/Write/Close per blob, same as before), so it cannot relieve an I/O-bound disk. The only mechanical change is widening write concurrency from 3 to 8, and the dominant count-triggered flush still runs synchronously on the block-processing goroutine. The motivating signal (sdb ~90% %util) doesn't establish I/O saturation on SSD/NVMe anyway. If catch-up is genuinely fsync-bound, the right lever is fewer fsyncs (e.g. relax fsyncMode for checkpoint-trusted below-checkpoint blocks) — not in-memory batching of the same write set. Needs a proper measurement before revisiting. |



Problem
During legacy catch-up, `writeSubtree` issues one filestore write + fsync per subtree blob (tree + data + meta = 3 per block). On mainnet this contributed to `sdb` 90% utilisation bursts in a profile captured 2026-04-17, with teranode only at 9% CPU — the node was I/O-bound on the subtree filestore writes.
Fix — SubtreeWriteBatcher gated on `quickValidationMode`
Introduce a `SubtreeWriteBatcher` behind `writeSubtree`, active only while `quickValidationMode` is true (i.e. the LEGACYSYNCING / below-checkpoint catch-up window where `checkSubtreeFromBlock` is already skipped).
`writeSubtree` becomes a dispatcher:
Flush triggers:
Gating on `SubtreeWriteBatchBlocks > 1`
At the default `batchBlocks=1` the batcher would flush after every block anyway (3 items per block = 1 block × 3), so there's no cross-block coalescing to gain — but the batched path still pays the cost of calling `subtreeData.Serialize()`, which can allocate ~10.9 GB for the largest historical blocks. `writeSubtreeDirect` streams subtreeData via `WriteTransactionsToWriter` and avoids that allocation.
So at `batchBlocks=1` we skip the batcher entirely and let `writeSubtree` dispatch straight to `writeSubtreeDirect`. The batcher is only constructed when operators explicitly opt in to cross-block coalescing (`batchBlocks>1`), in which case they have accepted the materialisation cost for the perf benefit.
Settings
Test plan
🤖 Generated with Claude Code