Conversation
5449fc3 to
5ca89f1
Compare
5ca89f1 to
501f080
Compare
501f080 to
f72c23f
Compare
This comment was marked as outdated.
This comment was marked as outdated.
How to use/test partial snapshot API# - start two separate clusters/peers
# - either create `test_collection` on both peers
# - or create `test_collection` on "write" peer, and then recover collection snapshot on "read" peer
# - but everything should work practically the same way in both cases
# - optionally, upsert some points to "write" peer
WRITE_PEER=http://localhost:6333
READ_PEER=http://localhost:6343
# Get partial snapshot manifest from "read" peer
#
# Optionally, you can use empty manifest `{}` (in this case partial snapshot would include all segments and all files)
MANIFEST=$(curl -s --fail-with-body $READ_PEER/collections/test_collection/shards/0/snapshot/partial/manifest | jq -r .result)
# Create partial snapshot on "write" peer
curl -s --fail-with-body \
-X POST $WRITE_PEER/collections/test_collection/shards/0/snapshot/partial/create \
-H 'Content-Type: application/json' \
-d $MANIFEST \
-o partial-snapshot.tar
# Upload and recover partial snapshot on "read" peer
curl -s --fail-with-body \
-X POST $READ_PEER/collections/test_collection/shards/0/snapshot/partial/recover \
-F snapshot=@partial-snapshot.tar |
f72c23f to
902bbdb
Compare
902bbdb to
5b0bdfd
Compare
|
Pushed integration tests into the PR. |
9b3f130 to
58dc7b6
Compare
58dc7b6 to
ee02761
Compare
|
Is it expected behavior, that if we try to recover delta older than actual segments in the collection, it will overwrite new data? |
|
Overall works good |
… snapshot Add comment
ee02761 to
4d7f33f
Compare
Yes. I'll add validation (that all files in the snapshot manifest should be newer than local files) in one of the follow-up PRs. |
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (14)
tests/consensus_tests/test_partial_snapshot.py (14)
42-46: Add docstring for the bootstrap_peers function.This helper function lacks documentation to explain its purpose and parameters.
def bootstrap_peers(tmp: pathlib.Path, bootstrap_points = 0, recover_read = False): + """Bootstrap write and read peers for testing. + + Args: + tmp: Temporary directory for storing peer data + bootstrap_points: Number of points to bootstrap the collection with + recover_read: Whether to recover the read peer from the write peer + + Returns: + A tuple containing the write peer URL and read peer URL + """ write_peer = bootstrap_write_peer(tmp, bootstrap_points) read_peer = bootstrap_read_peer(tmp, write_peer if recover_read else None) return write_peer, read_peer
47-61: Add docstrings and parameterize hardcoded port values.The peer bootstrapping functions lack documentation and contain hardcoded port values that should be defined as constants at the module level.
+# Default port numbers for peers +WRITE_PEER_PORT = 6331 +READ_PEER_PORT = 63331 + def bootstrap_write_peer(tmp: pathlib.Path, bootstrap_points = 0): + """Bootstrap a write peer for testing. + + Args: + tmp: Temporary directory for storing peer data + bootstrap_points: Number of points to bootstrap the collection with + + Returns: + URL of the bootstrapped write peer + """ - write_peer = bootstrap_peer(tmp / "write", 6331) + write_peer = bootstrap_peer(tmp / "write", WRITE_PEER_PORT) bootstrap_collection(write_peer, bootstrap_points) return write_peer def bootstrap_read_peer(tmp: pathlib.Path, recover_from_url: str | None = None): + """Bootstrap a read peer for testing. + + Args: + tmp: Temporary directory for storing peer data + recover_from_url: URL to recover the collection from, if provided + + Returns: + URL of the bootstrapped read peer + """ - read_peer = bootstrap_peer(tmp / "read", 63331) + read_peer = bootstrap_peer(tmp / "read", READ_PEER_PORT) if recover_from_url is None: bootstrap_collection(read_peer) else: recover_collection(read_peer, recover_from_url) return read_peer
62-73: Add docstring for bootstrap_peer function.The bootstrap_peer function lacks documentation explaining its purpose and parameters.
def bootstrap_peer(path: pathlib.Path, port: int): + """Bootstrap a single peer with the given configuration. + + Args: + path: Path to store peer data + port: Base port number for the peer + + Returns: + URL of the bootstrapped peer + """ path.mkdir() config = { "QDRANT__LOG_LEVEL": "debug,collection::common::file_utils=trace", "QDRANT__FEATURE_FLAGS__USE_MUTABLE_ID_TRACKER_WITHOUT_ROCKSDB": "true", } uris, _, _ = start_cluster(path, 1, port_seed = port, extra_env = config) return uris[0]🧰 Tools
🪛 Ruff (0.8.2)
70-70:
start_clustermay be undefined, or defined from star imports(F405)
74-80: Add docstring and parameterize hardcoded values in bootstrap_collection.The bootstrap_collection function lacks documentation and contains a hardcoded indexing threshold value.
+# Default collection configuration +INDEXING_THRESHOLD = 1000000 + def bootstrap_collection(peer_url, bootstrap_points = 0): + """Bootstrap a collection on the given peer. + + Args: + peer_url: URL of the peer to bootstrap the collection on + bootstrap_points: Number of points to bootstrap the collection with + """ - create_collection(peer_url, shard_number = 1, replication_factor = 1, indexing_threshold = 1000000, sparse_vectors = False) + create_collection(peer_url, shard_number = 1, replication_factor = 1, indexing_threshold = INDEXING_THRESHOLD, sparse_vectors = False) wait_collection_exists_and_active_on_all_peers(COLLECTION, [peer_url]) if bootstrap_points > 0: upsert(peer_url, bootstrap_points)🧰 Tools
🪛 Ruff (0.8.2)
75-75:
create_collectionmay be undefined, or defined from star imports(F405)
76-76:
wait_collection_exists_and_active_on_all_peersmay be undefined, or defined from star imports(F405)
81-86: Add docstring and improve error handling in recover_collection.The recover_collection function lacks documentation and error handling for snapshot operations.
def recover_collection(peer_url: str, recover_from_url: str): + """Recover a collection on the target peer from another peer. + + Args: + peer_url: URL of the peer to recover the collection on + recover_from_url: URL of the peer to recover the collection from + + Raises: + Exception: If the snapshot creation or recovery fails + """ snapshot_url = create_collection_snapshot(recover_from_url) recover_collection_snapshot(peer_url, snapshot_url) assert_point_consistency(peer_url, recover_from_url)
87-103: Add docstrings and improve error handling for snapshot operations.These snapshot operation functions would benefit from documentation and more robust error handling for network requests.
def create_collection_snapshot(peer_url: str): + """Create a snapshot of the collection on the given peer. + + Args: + peer_url: URL of the peer to create the snapshot on + + Returns: + URL of the created snapshot + + Raises: + Exception: If the snapshot creation fails + """ resp = requests.post(f"{peer_url}/collections/{COLLECTION}/snapshots") assert_http_ok(resp) snapshot_name = resp.json()["result"]["name"] snapshot_url = f"{peer_url}/collections/{COLLECTION}/snapshots/{snapshot_name}" return snapshot_url def recover_collection_snapshot(peer_url: str, snapshot_url: str): + """Recover a collection from a snapshot. + + Args: + peer_url: URL of the peer to recover the collection on + snapshot_url: URL of the snapshot to recover from + + Returns: + The response result from the recovery operation + + Raises: + Exception: If the snapshot recovery fails + """ resp = requests.put( f"{peer_url}/collections/{COLLECTION}/snapshots/recover", json = { "location": snapshot_url } ) assert_http_ok(resp) return resp.json()["result"]🧰 Tools
🪛 Ruff (0.8.2)
89-89:
assert_http_okmay be undefined, or defined from star imports(F405)
100-100:
assert_http_okmay be undefined, or defined from star imports(F405)
111-116: Add docstring for get_snapshot_manifest function.This function lacks documentation explaining its purpose and parameters.
def get_snapshot_manifest(peer_url: str): + """Get the snapshot manifest from a peer. + + Args: + peer_url: URL of the peer to get the manifest from + + Returns: + The snapshot manifest + + Raises: + Exception: If the request fails + """ resp = requests.get(f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/manifest") assert_http_ok(resp) return resp.json()["result"]🧰 Tools
🪛 Ruff (0.8.2)
113-113:
assert_http_okmay be undefined, or defined from star imports(F405)
117-130: Add docstring and improve error handling for create_partial_snapshot.This function lacks documentation and error handling for file operations.
def create_partial_snapshot(peer_url: str, manifest: dict[str, Any], tmp: pathlib.Path): + """Create a partial snapshot on a peer using the provided manifest. + + Args: + peer_url: URL of the peer to create the snapshot on + manifest: Manifest specifying which segments to include + tmp: Temporary path to store the snapshot + + Returns: + Path to the created snapshot file + + Raises: + Exception: If the snapshot creation fails or if writing to file fails + """ snapshot_resp = requests.post( f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/create", json = manifest, ) assert_http_ok(snapshot_resp) snapshot_path = tmp / "partial-snapshot.tar" + try: with open(snapshot_path, "wb") as snapshot_file: snapshot_file.write(snapshot_resp.content) + except IOError as e: + raise Exception(f"Failed to write snapshot to file: {e}") return snapshot_path🧰 Tools
🪛 Ruff (0.8.2)
122-122:
assert_http_okmay be undefined, or defined from star imports(F405)
131-140: Add docstring and improve error handling for recover_partial_snapshot.This function lacks documentation and error handling for file operations.
def recover_partial_snapshot(peer_url: str, snapshot_path: pathlib.Path): + """Recover a partial snapshot on a peer from a snapshot file. + + Args: + peer_url: URL of the peer to recover the snapshot on + snapshot_path: Path to the snapshot file + + Returns: + The response result from the recovery operation + + Raises: + Exception: If the snapshot recovery fails or if reading from file fails + """ + try: with open(snapshot_path, "rb") as snapshot_file: resp = requests.post( f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/recover", files = { "snapshot": snapshot_file }, ) assert_http_ok(resp) return resp.json()["result"] + except IOError as e: + raise Exception(f"Failed to read snapshot from file: {e}")🧰 Tools
🪛 Ruff (0.8.2)
137-137:
assert_http_okmay be undefined, or defined from star imports(F405)
142-145: Add docstring for assert_consistency function.This function lacks documentation explaining its purpose and parameters.
def assert_consistency(write_peer: str, read_peer: str): + """Assert that the write and read peers have consistent manifests and points. + + Args: + write_peer: URL of the write peer + read_peer: URL of the read peer + + Raises: + AssertionError: If manifests or points are inconsistent + """ assert_files_consistency(write_peer, read_peer) assert_point_consistency(write_peer, read_peer)
146-148: Add docstring for assert_files_consistency function.This function lacks documentation explaining its purpose and parameters.
def assert_files_consistency(write_peer: str, read_peer: str): + """Assert that the write and read peers have consistent file manifests. + + Args: + write_peer: URL of the write peer + read_peer: URL of the read peer + + Raises: + AssertionError: If file manifests are inconsistent + """ assert discard_file_versions(get_snapshot_manifest(write_peer)) == discard_file_versions(get_snapshot_manifest(read_peer))
155-157: Add docstring for assert_point_consistency function.This function lacks documentation explaining its purpose and parameters.
def assert_point_consistency(write_peer: str, read_peer: str): + """Assert that the write and read peers have consistent points. + + Args: + write_peer: URL of the write peer + read_peer: URL of the read peer + + Raises: + AssertionError: If points are inconsistent + """ assert scroll_points(write_peer) == scroll_points(read_peer)
158-170: Add docstring and implement pagination for large collections.The scroll_points function lacks documentation and might have issues with very large collections.
def scroll_points(peer_url: str): + """Scroll all points from a collection on a peer. + + Args: + peer_url: URL of the peer to scroll points from + + Returns: + Dictionary of points indexed by ID + """ resp = requests.post(f"{peer_url}/collections/{COLLECTION}/points/scroll", json = { "limit": 1000000, "with_vectors": True, "with_payload": True, }) assert_http_ok(resp) points = resp.json()["result"]["points"] points = { point["id"]: point for point in points } return pointsConsider implementing pagination for very large collections:
def scroll_points(peer_url: str): """Scroll all points from a collection on a peer with pagination. Args: peer_url: URL of the peer to scroll points from Returns: Dictionary of points indexed by ID """ all_points = {} offset = None limit = 100 # Smaller batch size for pagination while True: request_body = { "limit": limit, "with_vectors": True, "with_payload": True, } if offset is not None: request_body["offset"] = offset resp = requests.post(f"{peer_url}/collections/{COLLECTION}/points/scroll", json=request_body) assert_http_ok(resp) result = resp.json()["result"] batch_points = result["points"] # Add points to our collection for point in batch_points: all_points[point["id"]] = point # Check if we've reached the end if len(batch_points) < limit or "next_page_offset" not in result: break offset = result["next_page_offset"] return all_points🧰 Tools
🪛 Ruff (0.8.2)
164-164:
assert_http_okmay be undefined, or defined from star imports(F405)
149-154: 🛠️ Refactor suggestionAvoid mutating the input parameter and add docstring.
The function modifies the input parameter directly, which can cause side effects. Create a copy of the manifest first to avoid unexpected changes to the caller's data.
def discard_file_versions(snapshot_manifest: Any): + """Discard file versions from a snapshot manifest, keeping only file names. + + Args: + snapshot_manifest: The snapshot manifest to process + + Returns: + A copy of the manifest with file versions replaced by a set of file names + """ + # Create a deep copy to avoid modifying the original + import copy + manifest_copy = copy.deepcopy(snapshot_manifest) + - for _, segment_manifest in snapshot_manifest.items(): + for _, segment_manifest in manifest_copy.items(): segment_manifest['files'] = set(segment_manifest.pop('file_versions').keys()) - return snapshot_manifest + return manifest_copy
🧹 Nitpick comments (3)
tests/consensus_tests/test_partial_snapshot.py (3)
1-8: Consider using explicit imports instead of star imports.The file uses star imports from multiple modules which makes it harder to track which functions come from which modules. This can lead to maintenance issues and potential name conflicts.
-from .assertions import * -from .fixtures import * -from .utils import * +from .assertions import assert_http_ok +from .fixtures import upsert_random_points +from .utils import ( + assert_project_root, start_cluster, create_collection, + wait_collection_exists_and_active_on_all_peers +)🧰 Tools
🪛 Ruff (0.8.2)
5-5:
from .assertions import *used; unable to detect undefined names(F403)
6-6:
from .fixtures import *used; unable to detect undefined names(F403)
7-7:
from .utils import *used; unable to detect undefined names(F403)
105-110: Add docstring for recover_partial function and fix manifest handling.This function lacks documentation and has a subtle issue with how it handles the manifest parameter.
def recover_partial(peer_url: str, recover_from_url: str, tmp: pathlib.Path, manifest: Any = None): + """Recover a collection on the target peer from a partial snapshot of another peer. + + Args: + peer_url: URL of the peer to recover to + recover_from_url: URL of the peer to recover from + tmp: Temporary path for storing the snapshot + manifest: Optional manifest to use (if None, gets manifest from peer_url) + """ snapshot_path = create_partial_snapshot(recover_from_url, get_snapshot_manifest(peer_url) if manifest is None else manifest, tmp) recover_partial_snapshot(peer_url, snapshot_path) assert_consistency(recover_from_url, peer_url)
172-174: Add docstring for upsert function.This function lacks documentation explaining its purpose and parameters.
def upsert(peer_url: str, points: int, offset = 0): + """Upsert random points to a collection. + + Args: + peer_url: URL of the peer to upsert points to + points: Number of points to upsert + offset: Starting offset for point IDs + """ upsert_random_points(peer_url, points, offset = offset, batch_size = 10, with_sparse_vector = False)🧰 Tools
🪛 Ruff (0.8.2)
173-173:
upsert_random_pointsmay be undefined, or defined from star imports(F405)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
lib/collection/src/collection/snapshots.rs(9 hunks)lib/collection/src/collection_manager/holders/segment_holder.rs(4 hunks)lib/collection/src/common/file_utils.rs(1 hunks)lib/collection/src/shards/dummy_shard.rs(1 hunks)lib/collection/src/shards/forward_proxy_shard.rs(1 hunks)lib/collection/src/shards/local_shard/mod.rs(2 hunks)lib/collection/src/shards/proxy_shard.rs(1 hunks)lib/collection/src/shards/queue_proxy_shard.rs(1 hunks)lib/collection/src/shards/replica_set/snapshots.rs(5 hunks)lib/collection/src/shards/shard.rs(1 hunks)lib/collection/src/shards/shard_holder/mod.rs(8 hunks)lib/segment/src/data_types/segment_manifest.rs(2 hunks)lib/segment/src/segment/snapshot.rs(2 hunks)lib/storage/src/content_manager/snapshots/recover.rs(2 hunks)src/actix/api/snapshot_api.rs(5 hunks)src/common/snapshots.rs(6 hunks)tests/consensus_tests/test_partial_snapshot.py(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- lib/collection/src/shards/proxy_shard.rs
🚧 Files skipped from review as they are similar to previous changes (15)
- lib/storage/src/content_manager/snapshots/recover.rs
- lib/collection/src/shards/shard.rs
- lib/collection/src/shards/forward_proxy_shard.rs
- lib/segment/src/segment/snapshot.rs
- lib/collection/src/shards/local_shard/mod.rs
- lib/collection/src/shards/replica_set/snapshots.rs
- lib/segment/src/data_types/segment_manifest.rs
- lib/collection/src/shards/dummy_shard.rs
- src/actix/api/snapshot_api.rs
- lib/collection/src/shards/shard_holder/mod.rs
- lib/collection/src/common/file_utils.rs
- src/common/snapshots.rs
- lib/collection/src/shards/queue_proxy_shard.rs
- lib/collection/src/collection/snapshots.rs
- lib/collection/src/collection_manager/holders/segment_holder.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
tests/consensus_tests/test_partial_snapshot.py (3)
tests/consensus_tests/utils.py (3)
assert_project_root(90-93)start_cluster(189-214)wait_collection_exists_and_active_on_all_peers(710-714)tests/consensus_tests/assertions.py (1)
assert_http_ok(6-13)tests/consensus_tests/fixtures.py (1)
upsert_random_points(83-129)
🪛 Ruff (0.8.2)
tests/consensus_tests/test_partial_snapshot.py
5-5: from .assertions import * used; unable to detect undefined names
(F403)
6-6: from .fixtures import * used; unable to detect undefined names
(F403)
7-7: from .utils import * used; unable to detect undefined names
(F403)
12-12: pytest may be undefined, or defined from star imports
(F405)
32-32: assert_project_root may be undefined, or defined from star imports
(F405)
70-70: start_cluster may be undefined, or defined from star imports
(F405)
75-75: create_collection may be undefined, or defined from star imports
(F405)
76-76: wait_collection_exists_and_active_on_all_peers may be undefined, or defined from star imports
(F405)
89-89: assert_http_ok may be undefined, or defined from star imports
(F405)
100-100: assert_http_ok may be undefined, or defined from star imports
(F405)
113-113: assert_http_ok may be undefined, or defined from star imports
(F405)
122-122: assert_http_ok may be undefined, or defined from star imports
(F405)
137-137: assert_http_ok may be undefined, or defined from star imports
(F405)
164-164: assert_http_ok may be undefined, or defined from star imports
(F405)
173-173: upsert_random_points may be undefined, or defined from star imports
(F405)
⏰ Context from checks skipped due to timeout of 90000ms (13)
- GitHub Check: Basic TLS/HTTPS tests
- GitHub Check: test-snapshot-operations-s3-minio
- GitHub Check: test-shard-snapshot-api-s3-minio
- GitHub Check: test-low-resources
- GitHub Check: test-consistency
- GitHub Check: rust-tests (macos-latest)
- GitHub Check: test-consensus-compose
- GitHub Check: integration-tests
- GitHub Check: rust-tests (windows-latest)
- GitHub Check: integration-tests-consensus
- GitHub Check: storage-compat-test
- GitHub Check: rust-tests (ubuntu-latest)
- GitHub Check: lint
🔇 Additional comments (2)
tests/consensus_tests/test_partial_snapshot.py (2)
9-11: LGTM! Clear constant definitions.The constants are well-named and their purpose is clear.
12-40:Details
❌ Incorrect review comment
Test parameterization is comprehensive but has a subtle issue with empty dict handling.
The test is well-parameterized with various scenarios covering different combinations of bootstrap points, recovery modes, upsert operations, and manifest types.
However, in line 39, the pattern
{} if empty_manifest else Nonecould be problematic because in Python, an empty dict evaluates toFalsein boolean context, which might not give the expected behavior when checking withif manifestlater.
🏁 Script executed:
#!/bin/bash # Check how the empty_manifest parameter is handled in the code echo "Checking for empty manifest handling in recover_partial function..." grep -A 5 "def recover_partial" tests/consensus_tests/test_partial_snapshot.pyLength of output: 879
Empty dict handling in
recover_partialis correct
The function explicitly checksmanifest is None(not its truthiness), so passing{}will bypass the default call toget_snapshot_manifestas intended and does not raise any issues. Treating{}andNonedistinctly here is the correct behavior.Likely an incorrect or invalid review comment.
🧰 Tools
🪛 Ruff (0.8.2)
12-12:
pytestmay be undefined, or defined from star imports(F405)
32-32:
assert_project_rootmay be undefined, or defined from star imports(F405)
| // TODO: Only retry to move directory, if error kind is `CrossesDevices` or `AlreadyExists`? | ||
| // | ||
| // match err.kind() { | ||
| // io::ErrorKind::AlreadyExists | io::ErrorKind::CrossesDevices => (), | ||
| // _ => { | ||
| // return Err(CollectionError::service_error(format!( | ||
| // "failed to rename directory {} to {}: {err}", | ||
| // from.display(), | ||
| // to.display(), | ||
| // ))); | ||
| // } | ||
| // } |
There was a problem hiding this comment.
This was a complex piece of code. Nice refactor! Btw, why did you comment this? Are you unsure if this will miss some other errors and not retry for those? If yes, maybe drop the code comment and leave TODO?
This PR implements
create_partial_snapshot,restore_partial_snapshotandget_partial_snapshot_manifestendpoints.... and some bug fixes. 😅create_partial_snapshotis based onstream_shard_snapshotandrestore_partial_snapshotis based onupload_shard_snapshot, but both APIs remove snapshot file after request is processed.TODO:
there's a bug with partial snapshots0(e.g., the very first operation on a fresh shard) might produce incorrect partial snapshots (until they are updated to version > 0)mustfilter without any conditions)create_partial_snapshotfails while creating snapshot:tararchive 😬All Submissions:
devbranch. Did you create your branch fromdev?New Feature Submissions:
cargo +nightly fmt --allcommand prior to submission?cargo clippy --all --all-featurescommand?Changes to Core Features: