Skip to content

Refactor request_storage_ranges for correctness, readability, and performance #6140

@pablodeymo

Description

@pablodeymo

Plan: Refactor request_storage_ranges

File: crates/networking/p2p/snap/client.rs (lines 531-1055)
Goal: Full overhaul — readability, performance, and correctness


Current Problems

# Category Problem Lines
P1 Correctness panic!("Should have found the account hash") 734-736
P2 Correctness .expect() crashes node if store lookup fails 556-558
P3 Readability Big-account chunking duplicated (~70 lines), has TODO: DRY 813-883
P4 Readability 530-line function, 5+ nesting levels 531-1055
P5 Readability ensure_dir_exists pattern repeated 4 times 165-173, 297-303, 618-626, 1014-1022
P6 Performance Busy-poll: try_recv() + sleep(10ms) 652, 970
P7 Performance Buffer size recomputed every iteration 609-613
P8 Robustness accounts_done HashMap is unnecessary indirection (has TODO) 599-600
P9 Robustness task_count/completed_tasks as bare ints, easy to desync scattered

Step-by-Step Plan

Each step is one independently correct commit.

Step 1: Replace panic! with proper error (P1)

What: The panic!("Should have found the account hash") at line 735 crashes the node if a "not-actually-big" account hash can't be found in the intervals map. This is a recoverable error.

Current code (lines 718-736):

let mut acc_hash: H256 = H256::zero();
for account in accounts_by_root_hash[remaining_start].1.iter() {
    if let Some((_, old_intervals)) = account_storage_roots
        .accounts_with_storage_root
        .get(account)
    {
        if !old_intervals.is_empty() {
            acc_hash = *account;
        }
    } else {
        continue;
    }
}
if acc_hash.is_zero() {
    panic!("Should have found the account hash");
}

Change: Replace the if acc_hash.is_zero() { panic!(...) } with:

if acc_hash.is_zero() {
    return Err(SnapError::InternalError(
        "Should have found the account hash in intervals map".to_owned(),
    ));
}

Files: client.rs
Risk: Very low — same behavior path, just returns error instead of crashing.


Step 2: Replace .expect() with ? (P2)

What: Lines 554-558 use two .expect() calls that crash the node if the store lookup fails or the account doesn't exist. During snap sync, peers can provide incomplete data, so both failures are possible.

Current code (lines 554-558):

let root = store
    .get_account_state_by_acc_hash(pivot_header.hash(), *account)
    .expect("Failed to get account in state trie")
    .expect("Could not find account that should have been downloaded or healed")
    .storage_root;

Change: Use ? and ok_or_else since SnapError already has #[from] StoreError:

let root = store
    .get_account_state_by_acc_hash(pivot_header.hash(), *account)?
    .ok_or_else(|| {
        SnapError::InternalError(format!(
            "Could not find account {account:?} that should have been downloaded or healed"
        ))
    })?
    .storage_root;

Files: client.rs
Risk: Very low — same behavior path, but now propagates error instead of panicking.


Step 3: Extract ensure_snapshot_dir helper (P5)

What: The "check if dir exists, create if not" pattern appears 4 times with identical logic but different error messages. All use SnapError::SnapshotDir.

4 occurrences:

  1. request_account_range, line 165 — state snapshots (in-loop flush)
  2. request_account_range, line 297 — state snapshots (post-loop flush)
  3. request_storage_ranges, line 618 — storage snapshots (in-loop flush)
  4. request_storage_ranges, line 1014 — storage snapshots (post-loop flush)

Current pattern (each is 5 lines):

if !std::fs::exists(dir).map_err(|_| {
    SnapError::SnapshotDir("... directory does not exist".to_string())
})? {
    std::fs::create_dir_all(dir).map_err(|_| {
        SnapError::SnapshotDir("Failed to create ... directory".to_string())
    })?;
}

Change: Create a free function:

fn ensure_snapshot_dir(dir: &Path) -> Result<(), SnapError> {
    if !std::fs::exists(dir).map_err(|_| {
        SnapError::SnapshotDir(format!("Cannot check if {} exists", dir.display()))
    })? {
        std::fs::create_dir_all(dir).map_err(|_| {
            SnapError::SnapshotDir(format!("Failed to create directory {}", dir.display()))
        })?;
    }
    Ok(())
}

Replace all 4 occurrences with ensure_snapshot_dir(dir)?;.

Files: client.rs
Risk: Very low — pure extraction, no logic change.


Step 4: Extract big-account chunking helper — DRY (P3)

What: When a big storage account is discovered, the function generates sub-chunk tasks and intervals. This logic is duplicated in two branches:

  • else branch (lines 813-847): old_intervals exists but is empty
  • None branch (lines 849-883): no old_intervals entry at all

Both branches do the same thing: insert a fresh entry in accounts_with_storage_root, then generate chunk_count tasks spanning from start_hash_u256 to U256::MAX. The code has a TODO: DRY comment.

Duplicated logic (~35 lines, appearing twice):

account_storage_roots
    .accounts_with_storage_root
    .insert(first_acc_hash, (None, vec![]));
let (_, intervals) = account_storage_roots
    .accounts_with_storage_root
    .get_mut(&first_acc_hash)
    .ok_or(...)?;

for i in 0..chunk_count {
    let start_hash_u256 = start_hash_u256 + chunk_size * i;
    let start_hash = H256::from_uint(&start_hash_u256);
    let end_hash = if i == chunk_count - 1 {
        H256::repeat_byte(0xff)
    } else {
        let end_hash_u256 = start_hash_u256.checked_add(chunk_size).unwrap_or(U256::MAX);
        H256::from_uint(&end_hash_u256)
    };
    let task = StorageTask { start_index: remaining_start, end_index: remaining_start + 1, start_hash, end_hash: Some(end_hash) };
    intervals.push((start_hash, end_hash));
    tasks_queue_not_started.push_back(task);
    task_count += 1;
}

Change: Extract a function:

fn create_big_account_chunks(
    first_acc_hash: H256,
    remaining_start: usize,
    start_hash_u256: U256,
    chunk_size: U256,
    chunk_count: usize,
    account_storage_roots: &mut AccountStorageRoots,
    tasks_queue: &mut VecDeque<StorageTask>,
    tracker: &mut TaskTracker,
) -> Result<(), SnapError> {
    account_storage_roots
        .accounts_with_storage_root
        .insert(first_acc_hash, (None, vec![]));
    let (_, intervals) = account_storage_roots
        .accounts_with_storage_root
        .get_mut(&first_acc_hash)
        .ok_or_else(|| SnapError::InternalError("...".to_owned()))?;

    for i in 0..chunk_count {
        // ... generate tasks and intervals (identical logic)
    }
    debug!("Split big storage account into {chunk_count} chunks.");
    Ok(())
}

Replace both branches with a single call to create_big_account_chunks(...).

Files: client.rs
Risk: Low — the two branches are character-for-character identical in logic.


Step 5: Introduce TaskTracker struct (P9)

What: task_count and completed_tasks are bare usize values initialized at lines 596-597, incremented at scattered locations (lines 661, 689, 700, 766, 811, 845, 880), and compared at line 975. Easy to desync.

Current usage pattern:

let mut task_count = tasks_queue_not_started.len();  // line 596
let mut completed_tasks = 0;                          // line 597
// ...
completed_tasks += 1;                                 // line 661
task_count += 1;                                      // lines 689, 700, 766, 811, 845, 880
// ...
if completed_tasks >= task_count { break; }           // line 975

Change: Create a struct:

struct TaskTracker {
    total: usize,
    completed: usize,
}

impl TaskTracker {
    fn new(initial_count: usize) -> Self {
        Self { total: initial_count, completed: 0 }
    }
    fn add_task(&mut self) { self.total += 1; }
    fn complete_task(&mut self) { self.completed += 1; }
    fn all_complete(&self) -> bool { self.completed >= self.total }
    fn remaining(&self) -> usize { self.total.saturating_sub(self.completed) }
}

Mechanical replacements:

  • let mut task_count = ...; let mut completed_tasks = 0;let mut tracker = TaskTracker::new(tasks_queue_not_started.len());
  • task_count += 1tracker.add_task()
  • completed_tasks += 1tracker.complete_task()
  • completed_tasks >= task_counttracker.all_complete()
  • Debug format: task_counttracker.total, completed_taskstracker.completed

Files: client.rs
Risk: Very low — mechanical renaming.


Step 6: Extract result processing into a method (P4)

What: The main loop body from line 652 to 948 (~300 lines) handles StorageTaskResult processing. It has 5+ nesting levels and mixes concerns: interval bookkeeping, task generation, peer scoring, metrics, and storage accumulation. Extracting it reduces the main loop to ~50 lines.

Approach: Create a state struct holding mutable references to all loop state:

enum PeerOutcome {
    Success { peer_id: H256 },
    Failure { peer_id: H256 },
    NoScoreChange,
}

struct StorageDownloadState<'a> {
    accounts_by_root_hash: &'a [(H256, Vec<H256>)],
    account_storage_roots: &'a mut AccountStorageRoots,
    tasks_queue: &'a mut VecDeque<StorageTask>,
    tracker: &'a mut TaskTracker,
    accounts_done: &'a mut HashMap<H256, Vec<(H256, H256)>>,
    current_account_storages: &'a mut BTreeMap<H256, AccountsWithStorage>,
}

impl StorageDownloadState<'_> {
    fn process_result(&mut self, result: StorageTaskResult) -> Result<PeerOutcome, SnapError> {
        // All the logic from lines 652-948 goes here.
        // Returns PeerOutcome to tell the caller whether to score the peer.
    }
}

The main loop becomes:

loop {
    // flush logic (unchanged)
    if let Ok(result) = task_receiver.try_recv() {
        match state.process_result(result)? {
            PeerOutcome::Success { peer_id } => self.peer_table.record_success(&peer_id).await?,
            PeerOutcome::Failure { peer_id } => self.peer_table.record_failure(&peer_id).await?,
            PeerOutcome::NoScoreChange => {}
        }
    }
    // stale check, peer acquisition, task dispatch (unchanged)
}

Depends on: Steps 4 and 5 (the extracted method calls create_big_account_chunks and uses TaskTracker).

Files: client.rs
Risk: Medium — large mechanical extraction, but no logic change. Correctness verified by compilation and the fact that all mutable state is accessed through the same struct fields.


Step 7: Track buffer size incrementally (P7)

What: Every loop iteration recomputes the buffer size by iterating all entries in current_account_storages (lines 609-613):

current_account_storages
    .values()
    .map(|accounts| 32 * accounts.accounts.len() + 64 * accounts.storages.len())
    .sum::<usize>()
    > RANGE_FILE_CHUNK_SIZE

This is O(n) per iteration where n is the number of root hashes accumulated. It can be tracked incrementally for O(1).

Change: Add buffer_size: usize to StorageDownloadState. Update it:

  • On insert (lines 929-947): add 32 * accounts.len() + 64 * storages.len()
  • On flush (line 615): reset to 0 after std::mem::take

Replace the computation with self.buffer_size > RANGE_FILE_CHUNK_SIZE.

Add a debug_assert! during development that the incremental value matches the recomputed one, then remove the recomputation.

Depends on: Step 6.

Files: client.rs
Risk: Low — straightforward accounting.


Step 8: Remove accounts_done HashMap (P8)

What: accounts_done: HashMap<H256, Vec<(H256, H256)>> at line 600 accumulates accounts whose storage is fully downloaded. After the main loop (lines 1043-1049), it removes them from account_storage_roots.accounts_with_storage_root:

for (account_done, intervals) in accounts_done {
    if intervals.is_empty() {
        account_storage_roots
            .accounts_with_storage_root
            .remove(&account_done);
    }
}

The code has a TODO: in a refactor, delete this replace with a structure that can handle removes.

Why it's safe to remove inline: Done accounts are never read again after being marked — the only reads of accounts_with_storage_root inside the loop use remaining_start..remaining_end indices into accounts_by_root_hash, which is a separate Vec. The accounts_with_storage_root BTreeMap is only used for interval tracking.

Change:

  1. In process_result, where accounts_done.insert(*account, vec![]) is called (lines 672, 752), replace with:
    account_storage_roots.accounts_with_storage_root.remove(account);
  2. Remove the accounts_done field from StorageDownloadState.
  3. Remove the post-loop cleanup (lines 1043-1049).

Depends on: Step 6.

Files: client.rs
Risk: Low — need to verify no other code reads accounts_with_storage_root for the removed entries during the loop. Verified by reading all access sites: only interval tracking uses get_mut(&acc_hash) where acc_hash comes from the current result's remaining_start index, not from previously completed accounts.


Step 9: Replace busy-poll with tokio::select! (P6)

What: The main loop uses try_recv() (line 652) with sleep(10ms) (line 970) when no peers are available. This wastes CPU spinning and adds latency (up to 10ms delay processing results while sleeping).

Current pattern:

loop {
    // flush
    if let Ok(result) = task_receiver.try_recv() { ... }  // non-blocking
    // stale check
    let Some((peer_id, connection)) = self.peer_table.get_best_peer(...).await ... else {
        tokio::time::sleep(Duration::from_millis(10)).await;  // busy-poll
        continue;
    };
    let Some(task) = tasks_queue.pop_front() else {
        if tracker.all_complete() { break; }
        continue;
    };
    // spawn worker
}

Change: Replace with tokio::select!:

loop {
    // flush (unchanged)
    tokio::select! {
        biased;  // prefer processing results over spawning new tasks
        Some(result) = task_receiver.recv() => {
            match state.process_result(result)? {
                PeerOutcome::Success { peer_id } => self.peer_table.record_success(&peer_id).await?,
                PeerOutcome::Failure { peer_id } => self.peer_table.record_failure(&peer_id).await?,
                PeerOutcome::NoScoreChange => {}
            }
        }
        _ = async {}, if !tasks_queue.is_empty() => {
            if block_is_stale(pivot_header) { break; }
            let Some((peer_id, connection)) = self.peer_table
                .get_best_peer(&SUPPORTED_ETH_CAPABILITIES).await
                .unwrap_or(None)
            else {
                // No peers — yield back to select, which will block on recv()
                tokio::time::sleep(Duration::from_millis(10)).await;
                continue;
            };
            let task = tasks_queue.pop_front().unwrap();
            // spawn worker (unchanged)
        }
        else => break,  // channel closed and no tasks — done
    }
}

Cancel safety analysis:

  • mpsc::Receiver::recv() — cancel-safe per tokio docs (messages are only consumed on Poll::Ready)
  • get_best_peer() — makes a GenServer call() which sends a request on a channel and awaits a response. Cancelling mid-await loses the response but doesn't mutate state. The peer table remains consistent.

Depends on: Step 6 (uses process_result).

Note: The exact select! structure may need adjustment during implementation. The key insight is: we want to block on recv() when there are no tasks to dispatch, and concurrently dispatch tasks when peers are available. The biased keyword ensures results are processed before new tasks are spawned, preventing task queue growth.

Files: client.rs
Risk: Medium — changes async control flow. Needs a full snap sync run to verify. The biased mode ensures deterministic behavior (results processed before new task dispatch).


Sequencing & Dependencies

Steps 1, 2, 3 — independent, can be done in any order (very low risk)
Step 5 — independent (very low risk)
Step 4 — independent (low risk)
Step 6 — depends on Steps 4 and 5 (medium risk, large extraction)
Steps 7, 8 — depend on Step 6, independent of each other (low risk)
Step 9 — depends on Step 6 (medium risk, async control flow change)

Recommended execution order: 1 → 2 → 3 → 5 → 4 → 6 → 7 → 8 → 9

Critical Files

File Role
crates/networking/p2p/snap/client.rs Primary — the function being refactored
crates/networking/p2p/snap/error.rs Error types — SnapError has all needed variants (InternalError, SnapshotDir, Store)
crates/networking/p2p/sync.rs:164-174 AccountStorageRoots struct — has accounts_with_storage_root: BTreeMap<H256, (Option<H256>, Vec<(H256, H256)>)> and healed_accounts: HashSet<H256>
crates/networking/p2p/utils.rs:174-180 AccountsWithStorage struct — accounts: Vec<H256>, storages: Vec<(H256, U256)>
crates/networking/p2p/sync/snap_sync.rs:367-376 Caller site — calls request_storage_ranges with &mut storage_accounts
crates/networking/p2p/snap/client.rs:66-82 StorageTask and StorageTaskResult structs used throughout

Verification

Per step: cargo clippy -p ethrex-p2p && cargo build -p ethrex-p2p && cargo test -p ethrex-p2p

For Steps 6 and 9: full snap sync run against mainnet/testnet (no unit tests exist for this function).

Out of Scope (future follow-up)

  • Changing AccountsWithStorage.accounts from Vec<H256> to Arc<Vec<H256>> to avoid cloning — low priority, small cost
  • Adding unit tests for the extracted process_result method — good follow-up once extraction is done
  • Refactoring request_account_range similarly (it has the same busy-poll pattern)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions