Fix snapshot#3932
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
There was a problem hiding this comment.
Pull request overview
This PR implements significant refactoring to the snapshot building and replay functionality in the Rooch pruner. The changes improve snapshot integrity verification, state root handling, and introduce feature-gated testing.
Changes:
- Refactored snapshot building to use MoveOSStore directly instead of raw RocksDB, with added integrity verification
- Overhauled incremental replay to checkpoint from live store, normalize changesets, and properly handle state root transitions
- Added feature flags (
gc-tests,gc-perf-tests) to conditionally compile expensive garbage collection tests - Updated output directory structure to create properly nested store paths
Reviewed changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
crates/rooch-pruner/Cargo.toml |
Added feature flags for conditional test compilation |
Cargo.lock |
Added move-core-types dependency |
crates/rooch-pruner/tests/config_optimization_test.rs |
Gated test with gc-tests feature |
crates/rooch-pruner/src/tests/mod.rs |
Added feature gates to multiple test modules |
crates/rooch-pruner/src/util.rs |
Extracted strict version of child node extraction for better error handling |
crates/rooch-pruner/src/safety_verifier.rs |
Gated tests with gc-tests feature |
crates/rooch-pruner/src/marker.rs |
Gated tests with gc-tests feature |
crates/rooch-pruner/src/historical_state.rs |
Gated tests with gc-tests feature |
crates/rooch-pruner/src/garbage_collector.rs |
Gated tests with gc-tests feature |
crates/rooch/src/commands/db/commands/state_prune/replay.rs |
Updated output path construction and removed unused moveos_store parameter |
crates/rooch-pruner/src/state_prune/snapshot_builder.rs |
Major refactoring: replaced raw RocksDB with MoveOSStore, added integrity verification and startup_info persistence |
crates/rooch-pruner/src/state_prune/incremental_replayer.rs |
Complete overhaul of replay logic: added checkpoint-based output preparation, changeset normalization, metadata trimming, and sequential state root tracking |
| // DEBUG: Log changeset size | ||
| info!( | ||
| "DEBUG tx_order {}: changeset has {} fields to update", | ||
| tx_order, | ||
| changeset.changes.len() | ||
| ); |
There was a problem hiding this comment.
The DEBUG logging statements should be removed or changed to use the debug! macro. These appear to be temporary debugging code that should not be committed to production.
| // DEBUG: Log all fields being updated | ||
| for (field_key, obj_change) in &changeset.changes { | ||
| debug!( | ||
| "DEBUG tx_order {}: field {:?}, op: {:?}, object state_root: {:?}", | ||
| tx_order, field_key, obj_change.value, obj_change.metadata.state_root | ||
| ); | ||
| } |
There was a problem hiding this comment.
The DEBUG logging statements should be removed or changed to use the debug! macro. These appear to be temporary debugging code that should not be committed to production.
| let object_state = resolved.ok_or_else(|| { | ||
| anyhow::anyhow!("Object {} not found in pre-state during replay", object_id) | ||
| })?; | ||
|
|
||
| obj_change.metadata.state_root = object_state.metadata.state_root; |
There was a problem hiding this comment.
The function normalize_changeset_pre_state_roots modifies the changeset in place but takes a mutable reference. For objects marked as Op::New, it sets state_root to None (line 416), but for other operations, it retrieves and sets the state_root from the resolved object. However, if the object is not found in the pre-state (line 419-421), an error is returned. This means that if the live changesets have any inconsistencies or if the snapshot is incomplete, the replay will fail. Consider whether this is the intended behavior or if there should be more graceful handling for edge cases.
| let object_state = resolved.ok_or_else(|| { | |
| anyhow::anyhow!("Object {} not found in pre-state during replay", object_id) | |
| })?; | |
| obj_change.metadata.state_root = object_state.metadata.state_root; | |
| match resolved { | |
| Some(object_state) => { | |
| obj_change.metadata.state_root = object_state.metadata.state_root; | |
| } | |
| None => { | |
| // If the object is missing from the pre-state snapshot, keep the existing | |
| // state_root and continue replay. This makes replay more robust to | |
| // incomplete snapshots or minor inconsistencies in live changesets. | |
| warn!( | |
| "Object {} not found in pre-state during replay; \ | |
| keeping existing state_root in changeset", | |
| object_id | |
| ); | |
| } | |
| } |
| pub fn save_startup_info(&self, state_root: H256, global_size: u64) -> Result<()> { | ||
| let startup_info = StartupInfo::new(state_root, global_size); | ||
| self.moveos_store | ||
| .get_config_store() | ||
| .save_startup_info(startup_info)?; | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
The save_startup_info function lacks test coverage. This function is critical for ensuring the snapshot can be opened as a MoveOS store and should be tested.
| info!( | ||
| "DEBUG tx_order {}: changeset.state_root (post-root) = {:x}, current_state_root (pre-root) = {:x}", | ||
| tx_order, expected_root, current_state_root | ||
| ); |
There was a problem hiding this comment.
The DEBUG logging statements should be removed or changed to use the debug! macro instead of info!. These appear to be temporary debugging code that should not be committed to production. DEBUG prefix in info! logs is unusual and suggests this is temporary code.
| fn verify_snapshot_integrity(&self, output_dir: &Path, state_root: H256) -> Result<()> { | ||
| let snapshot_db_path = output_dir.join("snapshot.db"); | ||
| let registry = Registry::new(); | ||
| let snapshot_store = MoveOSStore::new(&snapshot_db_path, ®istry).map_err(|e| { | ||
| anyhow::anyhow!( | ||
| "Failed to open snapshot database at {:?}: {}", | ||
| snapshot_db_path, | ||
| e | ||
| ) | ||
| })?; | ||
| let node_store = snapshot_store.get_state_node_store(); | ||
|
|
||
| if state_root != *SPARSE_MERKLE_PLACEHOLDER_HASH && node_store.get(&state_root)?.is_none() { | ||
| return Err(anyhow::anyhow!( | ||
| "Snapshot integrity check failed: missing root node {:x}", | ||
| state_root | ||
| )); | ||
| } | ||
|
|
||
| let raw_db = node_store | ||
| .get_store() | ||
| .store() | ||
| .db() | ||
| .ok_or_else(|| anyhow::anyhow!("Failed to access snapshot RocksDB instance"))? | ||
| .inner(); | ||
| let cf = raw_db | ||
| .cf_handle(STATE_NODE_COLUMN_FAMILY_NAME) | ||
| .ok_or_else(|| anyhow::anyhow!("State node column family not found"))?; | ||
| let mut iter = raw_db.raw_iterator_cf(&cf); | ||
| let mut checked = 0u64; | ||
|
|
||
| iter.seek_to_first(); | ||
| while iter.valid() { | ||
| if let (Some(key), Some(value)) = (iter.key(), iter.value()) { | ||
| if key.len() != 32 { | ||
| return Err(anyhow::anyhow!( | ||
| "Invalid state node key length: {}", | ||
| key.len() | ||
| )); | ||
| } | ||
| let children = extract_child_nodes_strict(value)?; | ||
| for child in children { | ||
| if node_store.get(&child)?.is_none() { | ||
| return Err(anyhow::anyhow!( | ||
| "Snapshot integrity check failed: missing child node {:x}", | ||
| child | ||
| )); | ||
| } | ||
| } | ||
| checked += 1; | ||
| } | ||
| iter.next(); | ||
| } | ||
| iter.status() | ||
| .map_err(|e| anyhow::anyhow!("Snapshot iterator error: {}", e))?; | ||
|
|
||
| info!( | ||
| "Snapshot integrity check passed ({} nodes verified)", | ||
| checked | ||
| ); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
The new verify_snapshot_integrity function lacks test coverage. This is a critical verification function that should have tests to ensure it correctly detects missing nodes, invalid keys, and other integrity issues.
| /// Create output database by checkpointing the live store | ||
| fn prepare_output_store(&self, output_dir: &Path) -> Result<()> { | ||
| if output_dir.exists() { | ||
| return Err(anyhow::anyhow!( | ||
| "Output directory already exists: {:?}. Please provide an empty path.", | ||
| output_dir | ||
| )); | ||
| } | ||
|
|
||
| if let Some(parent) = output_dir.parent() { | ||
| fs::create_dir_all(parent)?; | ||
| } | ||
|
|
||
| let rocks_db = self | ||
| .rooch_store | ||
| .store_instance | ||
| .db() | ||
| .ok_or_else(|| anyhow::anyhow!("Failed to access RocksDB instance"))? | ||
| .inner(); | ||
|
|
||
| let checkpoint = Checkpoint::new(rocks_db) | ||
| .map_err(|e| anyhow::anyhow!("Checkpoint init failed: {}", e))?; | ||
| checkpoint | ||
| .create_checkpoint(output_dir) | ||
| .map_err(|e| anyhow::anyhow!("Failed to create output checkpoint: {}", e))?; | ||
|
|
||
| info!("Created output database checkpoint at {:?}", output_dir); | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Load output store from output path | ||
| fn load_output_store(&self, output_dir: &Path) -> Result<MoveOSStore> { | ||
| if !output_dir.exists() || !output_dir.is_dir() { | ||
| return Err(anyhow::anyhow!( | ||
| "Output database not found at {:?}.", | ||
| output_dir | ||
| )); | ||
| } | ||
|
|
||
| let registry = Registry::new(); | ||
| let output_store = MoveOSStore::new(output_dir, ®istry).map_err(|e| { | ||
| anyhow::anyhow!("Failed to load output store from {:?}: {}", output_dir, e) | ||
| })?; | ||
|
|
||
| Ok(output_store) | ||
| } | ||
|
|
||
| fn load_output_rooch_store(&self, output_store: &MoveOSStore) -> Result<RoochStore> { | ||
| let store_instance = output_store | ||
| .get_state_node_store() | ||
| .get_store() | ||
| .store() | ||
| .clone(); | ||
|
|
||
| let registry = Registry::new(); | ||
| RoochStore::new_with_instance(store_instance, ®istry) | ||
| .map_err(|e| anyhow::anyhow!("Failed to load output RoochStore from output DB: {}", e)) | ||
| } | ||
|
|
||
| /// Clear state nodes in output store before importing snapshot | ||
| fn clear_state_nodes(&self, output_store: &MoveOSStore) -> Result<()> { | ||
| let node_store = output_store.get_state_node_store(); | ||
| let start = H256::zero(); | ||
| let end = H256::from_slice(&[0xFFu8; 32]); | ||
|
|
||
| node_store.delete_range_nodes(start, end, true)?; | ||
| node_store.delete_nodes_with_flush(vec![end], true)?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Import snapshot nodes into output store | ||
| fn import_snapshot_nodes( | ||
| &self, | ||
| snapshot_store: &MoveOSStore, | ||
| output_store: &MoveOSStore, | ||
| report: &mut ReplayReport, | ||
| metadata: &mut StatePruneMetadata, | ||
| ) -> Result<()> { | ||
| let raw_db = snapshot_store | ||
| .get_state_node_store() | ||
| .get_store() | ||
| .store() | ||
| .db() | ||
| .ok_or_else(|| anyhow::anyhow!("Failed to access snapshot RocksDB instance"))? | ||
| .inner(); | ||
| let cf = raw_db | ||
| .cf_handle(STATE_NODE_COLUMN_FAMILY_NAME) | ||
| .ok_or_else(|| anyhow::anyhow!("State node column family not found"))?; | ||
| let mut iter = raw_db.raw_iterator_cf(&cf); | ||
| iter.seek_to_first(); | ||
|
|
||
| let mut batch = BTreeMap::new(); | ||
| let mut imported = 0u64; | ||
| let mut last_report = Instant::now(); | ||
|
|
||
| while iter.valid() { | ||
| if let (Some(key), Some(value)) = (iter.key(), iter.value()) { | ||
| if key.len() != 32 { | ||
| return Err(anyhow::anyhow!( | ||
| "Invalid state node key length: {}", | ||
| key.len() | ||
| )); | ||
| } | ||
| let hash = H256::from_slice(key); | ||
| batch.insert(hash, value.to_vec()); | ||
| } | ||
|
|
||
| if batch.len() >= self.config.default_batch_size { | ||
| output_store.get_state_node_store().write_nodes(batch)?; | ||
| imported += self.config.default_batch_size as u64; | ||
| batch = BTreeMap::new(); | ||
|
|
||
| if last_report.elapsed() >= Duration::from_secs(30) { | ||
| metadata | ||
| .mark_in_progress(format!("Importing snapshot nodes ({})", imported), 20.0); | ||
| last_report = Instant::now(); | ||
| } | ||
| } | ||
|
|
||
| iter.next(); | ||
| } | ||
|
|
||
| iter.status() | ||
| .map_err(|e| anyhow::anyhow!("Snapshot iterator error: {}", e))?; | ||
|
|
||
| if !batch.is_empty() { | ||
| imported += batch.len() as u64; | ||
| output_store.get_state_node_store().write_nodes(batch)?; | ||
| } | ||
|
|
||
| report.nodes_updated += imported; | ||
| info!("Imported {} snapshot nodes into output store", imported); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn update_startup_info( | ||
| &self, | ||
| output_store: &MoveOSStore, | ||
| state_root: H256, | ||
| global_size: u64, | ||
| ) -> Result<()> { | ||
| output_store | ||
| .get_config_store() | ||
| .save_startup_info(StartupInfo::new(state_root, global_size))?; | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn normalize_changeset_pre_state_roots( | ||
| &self, | ||
| output_store: &MoveOSStore, | ||
| pre_state_root: H256, | ||
| changeset: &mut moveos_types::state::StateChangeSet, | ||
| ) -> Result<()> { | ||
| let root_metadata = moveos_types::moveos_std::object::ObjectMeta::root_metadata( | ||
| pre_state_root, | ||
| changeset.global_size, | ||
| ); | ||
| let resolver = | ||
| moveos_types::state_resolver::RootObjectResolver::new(root_metadata, output_store); | ||
|
|
||
| for obj_change in changeset.changes.values_mut() { | ||
| self.normalize_object_change_pre_state_root(&resolver, obj_change)?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn normalize_object_change_pre_state_root( | ||
| &self, | ||
| resolver: &moveos_types::state_resolver::RootObjectResolver<'_, MoveOSStore>, | ||
| obj_change: &mut moveos_types::state::ObjectChange, | ||
| ) -> Result<()> { | ||
| let object_id = obj_change.metadata.id.clone(); | ||
| let resolved = resolver.get_object(&object_id)?; | ||
|
|
||
| match &obj_change.value { | ||
| Some(Op::New(_)) => { | ||
| if resolved.is_some() { | ||
| return Err(anyhow::anyhow!( | ||
| "Object {} marked as New but exists in pre-state", | ||
| object_id | ||
| )); | ||
| } | ||
| if obj_change.metadata.state_root.is_some() | ||
| && obj_change.metadata.state_root != Some(*GENESIS_STATE_ROOT) | ||
| { | ||
| warn!( | ||
| "New object {} has non-empty state_root; resetting to GENESIS for replay", | ||
| object_id | ||
| ); | ||
| } | ||
| obj_change.metadata.state_root = None; | ||
| } | ||
| _ => { | ||
| let object_state = resolved.ok_or_else(|| { | ||
| anyhow::anyhow!("Object {} not found in pre-state during replay", object_id) | ||
| })?; | ||
|
|
||
| obj_change.metadata.state_root = object_state.metadata.state_root; | ||
| } | ||
| } | ||
|
|
||
| for child in obj_change.fields.values_mut() { | ||
| self.normalize_object_change_pre_state_root(resolver, child)?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn trim_output_store( | ||
| &self, | ||
| output_store: &MoveOSStore, | ||
| to_order: u64, | ||
| metadata: &mut StatePruneMetadata, | ||
| ) -> Result<()> { | ||
| let output_rooch_store = self.load_output_rooch_store(output_store)?; | ||
|
|
||
| let sequencer_info = output_rooch_store | ||
| .get_meta_store() | ||
| .get_sequencer_info()? | ||
| .ok_or_else(|| anyhow::anyhow!("Sequencer info not found in output store"))?; | ||
| let last_order = sequencer_info.last_order; | ||
|
|
||
| if to_order > last_order { | ||
| return Err(anyhow::anyhow!( | ||
| "to_order {} exceeds output store last_order {}", | ||
| to_order, | ||
| last_order | ||
| )); | ||
| } | ||
|
|
||
| if to_order == last_order { | ||
| info!( | ||
| "Output store already at to_order {}, skipping metadata trim", | ||
| to_order | ||
| ); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| metadata.mark_in_progress( | ||
| format!("Trimming output metadata ({} -> {})", last_order, to_order), | ||
| 92.0, | ||
| ); | ||
|
|
||
| let target_tx = output_rooch_store | ||
| .transaction_store | ||
| .get_tx_by_order(to_order)? | ||
| .ok_or_else(|| { | ||
| anyhow::anyhow!( | ||
| "Missing transaction for to_order {} in output store", | ||
| to_order | ||
| ) | ||
| })?; | ||
| let new_sequencer_info = | ||
| SequencerInfo::new(to_order, target_tx.sequence_info.tx_accumulator_info()); | ||
| output_rooch_store | ||
| .get_meta_store() | ||
| .save_sequencer_info_unsafe(new_sequencer_info)?; | ||
|
|
||
| let mut removed_transactions = 0u64; | ||
| let mut removed_changesets = 0u64; | ||
| let mut start = to_order | ||
| .checked_add(1) | ||
| .ok_or_else(|| anyhow::anyhow!("to_order overflow"))?; | ||
| let batch_size = self.config.default_batch_size.max(1) as u64; | ||
|
|
||
| while start <= last_order { | ||
| let end = min(last_order, start.saturating_add(batch_size - 1)); | ||
| let orders: Vec<u64> = (start..=end).collect(); | ||
| let tx_hashes = output_rooch_store.transaction_store.get_tx_hashes(orders)?; | ||
|
|
||
| for (index, tx_hash_opt) in tx_hashes.into_iter().enumerate() { | ||
| let tx_order = start + index as u64; | ||
|
|
||
| output_rooch_store | ||
| .get_state_store() | ||
| .remove_state_change_set(tx_order)?; | ||
| removed_changesets += 1; | ||
|
|
||
| if let Some(tx_hash) = tx_hash_opt { | ||
| output_rooch_store | ||
| .transaction_store | ||
| .remove_transaction(tx_hash, tx_order)?; | ||
| output_store | ||
| .get_transaction_store() | ||
| .remove_tx_execution_info(tx_hash)?; | ||
| removed_transactions += 1; | ||
| } else { | ||
| warn!("Missing tx hash for order {} during trim", tx_order); | ||
| } | ||
| } | ||
|
|
||
| start = end.saturating_add(1); | ||
| } | ||
|
|
||
| let (da_issues, da_fixed) = | ||
| output_rooch_store.try_repair_da_meta(to_order, false, None, false, false)?; | ||
| if da_issues > 0 { | ||
| info!( | ||
| "DA meta repair after trim: issues {}, fixed {}", | ||
| da_issues, da_fixed | ||
| ); | ||
| } | ||
|
|
||
| let last_block_number = output_rooch_store.get_last_block_number()?; | ||
| let last_proposed = output_rooch_store.get_last_proposed()?; | ||
| match (last_block_number, last_proposed) { | ||
| (None, Some(_)) => { | ||
| output_rooch_store.clear_last_proposed()?; | ||
| } | ||
| (Some(last_block_number), Some(last_proposed)) if last_proposed > last_block_number => { | ||
| output_rooch_store.set_last_proposed(last_block_number)?; | ||
| } | ||
| _ => {} | ||
| } | ||
|
|
||
| info!( | ||
| "Trimmed output store to order {} (removed {} txs, {} changesets)", | ||
| to_order, removed_transactions, removed_changesets | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
The new functions prepare_output_store, clear_state_nodes, import_snapshot_nodes, normalize_changeset_pre_state_roots, normalize_object_change_pre_state_root, and trim_output_store lack test coverage. These are complex functions with critical functionality that should be tested.
| // Validate range (inclusive) | ||
| if from_order > to_order { | ||
| info!("Empty changeset range: {}..{}", from_order, to_order); | ||
| return Ok(Vec::new()); | ||
| } | ||
|
|
||
| info!( | ||
| "Loading changesets in range {}..{} from rooch_store", | ||
| "Loading changesets in range {}..{} (inclusive) from rooch_store", | ||
| from_order, to_order | ||
| ); | ||
|
|
||
| let range_end = to_order | ||
| .checked_add(1) | ||
| .ok_or_else(|| anyhow::anyhow!("to_order overflow: {}", to_order))?; | ||
|
|
||
| // Load changesets from the rooch_store's state store | ||
| let changesets = self | ||
| .rooch_store | ||
| .get_state_store() | ||
| .get_changesets_range(from_order, to_order) | ||
| .get_changesets_range(from_order, range_end) |
There was a problem hiding this comment.
The comment says "inclusive" but the original range operation was exclusive (from_order..to_order). Now the code adds 1 to to_order to make it inclusive, but the logic and variable names should be clearer about this transformation. The comment on line 556 could be misleading since the actual range parameter is still exclusive in the implementation.
| // DEBUG: Enhanced error message | ||
| anyhow::anyhow!( | ||
| "Failed to convert changeset {} to nodes: {}\n current_state_root (pre): {:x}\n expected_root (post): {:x}\n fields_count: {}", | ||
| tx_order, e, current_state_root, expected_root, changeset.changes.len() |
There was a problem hiding this comment.
Enhanced error message contains DEBUG prefix and verbose logging. This should be cleaned up to provide a concise error message without debug-style formatting.
| // DEBUG: Enhanced error message | |
| anyhow::anyhow!( | |
| "Failed to convert changeset {} to nodes: {}\n current_state_root (pre): {:x}\n expected_root (post): {:x}\n fields_count: {}", | |
| tx_order, e, current_state_root, expected_root, changeset.changes.len() | |
| anyhow::anyhow!( | |
| "Failed to convert changeset for tx_order {}: {}", | |
| tx_order, | |
| e |
| fn verify_snapshot_integrity(&self, output_dir: &Path, state_root: H256) -> Result<()> { | ||
| let snapshot_db_path = output_dir.join("snapshot.db"); | ||
| let registry = Registry::new(); | ||
| let snapshot_store = MoveOSStore::new(&snapshot_db_path, ®istry).map_err(|e| { | ||
| anyhow::anyhow!( | ||
| "Failed to open snapshot database at {:?}: {}", | ||
| snapshot_db_path, | ||
| e | ||
| ) | ||
| })?; |
There was a problem hiding this comment.
The verify_snapshot_integrity function creates a new MoveOSStore instance to verify the snapshot, which may be expensive. Consider reusing the snapshot_writer's store instance instead of opening a new database connection, or document why a fresh instance is necessary for verification.
Summary
Summary about this PR