Conversation
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
lib/collection/src/shards/local_shard/mod.rs (2)
301-327: Make segment scheduling deterministic (sort paths) and pre-collectread_dir order is OS-dependent; sorting improves reproducibility and test stability.
Apply within this hunk:
- let mut segment_paths = segment_paths - .into_iter() + let mut segment_paths: Vec<PathBuf> = segment_paths + .into_iter() .filter(|entry| { @@ - }) - .map(|entry| entry.path()); + }) + .map(|entry| entry.path()) + .collect(); + // Deterministic ordering to improve reproducibility and testability + segment_paths.sort_unstable(); + // Turn into an iterator for the scheduling loop below + let mut segment_paths = segment_paths.into_iter();
328-353: Add path-context to errors from load/repair/index rebuild; simplify OkGives actionable diagnostics per segment and avoids relying on upstream context; also simplify
CollectionResult::OktoOk.- let load_segment_task = |segment_path: PathBuf| { - let payload_index_schema = payload_index_schema.clone(); - move || { - let segment = load_segment(&segment_path, &AtomicBool::new(false))?; + let load_segment_task = |segment_path: PathBuf| { + let payload_index_schema = payload_index_schema.clone(); + move || { + let segment = load_segment(&segment_path, &AtomicBool::new(false)).map_err(|err| { + CollectionError::service_error(format!( + "failed to load segment {}: {err}", + segment_path.display() + )) + })?; @@ - segment.check_consistency_and_repair()?; + segment.check_consistency_and_repair().map_err(|err| { + CollectionError::service_error(format!( + "failed consistency repair for {}: {err}", + segment_path.display() + )) + })?; @@ - if rebuild_payload_index { - segment - .update_all_field_indices(&payload_index_schema.read().schema.clone())?; - } + if rebuild_payload_index { + segment + .update_all_field_indices(&payload_index_schema.read().schema.clone()) + .map_err(|err| { + CollectionError::service_error(format!( + "failed to rebuild payload indices for {}: {err}", + segment_path.display() + )) + })?; + } @@ - CollectionResult::Ok(Some(segment)) + Ok(Some(segment)) } };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
lib/collection/src/shards/local_shard/mod.rs(5 hunks)lib/common/common/src/defaults.rs(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs
📄 CodeRabbit inference engine (.github/review-rules.md)
**/*.rs: Prefer explicit SomeType::from(x) over implicit x.into() in Rust code
Do not use transmute_from_u8, transmute_to_u8, transmute_from_u8_to_slice, transmute_from_u8_to_mut_slice, transmute_to_u8_slice in new code; use bytemuck or zerocopy instead
Files:
lib/common/common/src/defaults.rslib/collection/src/shards/local_shard/mod.rs
**/src/**/*.rs
📄 CodeRabbit inference engine (.github/review-rules.md)
**/src/**/*.rs: Prefer exhaustive match arms over a catch-all _ arm to avoid missing new enum variants (except in tests/benchmarks or when provably safe)
Prefer explicit field ignoring with : _ over .. in struct patterns (except in tests/benchmarks or when provably safe)
Files:
lib/common/common/src/defaults.rslib/collection/src/shards/local_shard/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: rust-tests (macos-latest)
- GitHub Check: rust-tests-no-rocksdb (ubuntu-latest)
- GitHub Check: rust-tests (windows-latest)
- GitHub Check: rust-tests (ubuntu-latest)
- GitHub Check: storage-compat-test
- GitHub Check: lint
- GitHub Check: test-consensus-compose
- GitHub Check: test-consistency
- GitHub Check: e2e-tests
- GitHub Check: test-shard-snapshot-api-s3-minio
- GitHub Check: integration-tests-consensus
- GitHub Check: integration-tests
🔇 Additional comments (4)
lib/common/common/src/defaults.rs (1)
16-18: Concurrency cap constant looks goodClear doc, sensible default, and scoped in a shared defaults module.
lib/collection/src/shards/local_shard/mod.rs (3)
24-24: Import of MAX_CONCURRENT_SEGMENT_LOADS is correctMatches the new shared default; keeps the cap centralized.
47-47: JoinSet import is appropriate for bounded fan-outRight tool for managing the concurrent blocking tasks.
355-358: Initial bounded scheduling is correctSpawns up to MAX_CONCURRENT_SEGMENT_LOADS; good use of spawn_blocking for I/O-heavy work.
| while let Some(result) = load_segment_tasks.join_next().await { | ||
| let segment = result??; | ||
|
|
||
| if let Some(segment) = segment { | ||
| collection_config_read | ||
| .params | ||
| .vectors | ||
| .check_compatible_with_segment_config(&segment.config().vector_data, true)?; | ||
| collection_config_read | ||
| .params | ||
| .sparse_vectors | ||
| .as_ref() | ||
| .map(|sparse_vectors| { | ||
| check_sparse_compatible_with_segment_config( | ||
| sparse_vectors, | ||
| &segment.config().sparse_vector_data, | ||
| true, | ||
| ) | ||
| }) | ||
| .unwrap_or(Ok(()))?; | ||
|
|
||
| segment_holder.add_new(segment); | ||
| } | ||
|
|
||
| segment_holder.add_new(segment); | ||
| if let Some(segment_path) = segment_paths.next() { | ||
| load_segment_tasks.spawn_blocking(load_segment_task(segment_path)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Do not early-return on first error; drain JoinSet and don’t spawn new tasks after failure
Using result?? can:
- Rely on an implicit From -> CollectionError (often absent) and drop error context.
- Return early and drop JoinSet. For spawn_blocking, aborting the JoinHandle does not stop the underlying blocking work; those tasks keep running in the background, causing load/migration churn after the shard load has failed.
Handle JoinError explicitly, capture the first error, stop spawning new tasks, drain the set, then return the error.
- while let Some(result) = load_segment_tasks.join_next().await {
- let segment = result??;
-
- if let Some(segment) = segment {
- collection_config_read
- .params
- .vectors
- .check_compatible_with_segment_config(&segment.config().vector_data, true)?;
- collection_config_read
- .params
- .sparse_vectors
- .as_ref()
- .map(|sparse_vectors| {
- check_sparse_compatible_with_segment_config(
- sparse_vectors,
- &segment.config().sparse_vector_data,
- true,
- )
- })
- .unwrap_or(Ok(()))?;
-
- segment_holder.add_new(segment);
- }
-
- if let Some(segment_path) = segment_paths.next() {
- load_segment_tasks.spawn_blocking(load_segment_task(segment_path));
- }
- }
+ let mut first_err: Option<CollectionError> = None;
+ while let Some(result) = load_segment_tasks.join_next().await {
+ match result {
+ Err(join_err) => {
+ let err = CollectionError::service_error(format!(
+ "failed to join segment load task: {join_err}"
+ ));
+ if first_err.is_none() {
+ first_err = Some(err);
+ }
+ }
+ Ok(Err(load_err)) => {
+ if first_err.is_none() {
+ first_err = Some(load_err);
+ }
+ }
+ Ok(Ok(Some(segment))) => {
+ collection_config_read
+ .params
+ .vectors
+ .check_compatible_with_segment_config(&segment.config().vector_data, true)?;
+ collection_config_read
+ .params
+ .sparse_vectors
+ .as_ref()
+ .map(|sparse_vectors| {
+ check_sparse_compatible_with_segment_config(
+ sparse_vectors,
+ &segment.config().sparse_vector_data,
+ true,
+ )
+ })
+ .unwrap_or(Ok(()))?;
+ segment_holder.add_new(segment);
+ }
+ Ok(Ok(None)) => {}
+ }
+
+ // Maintain concurrency only if no error has been observed.
+ if first_err.is_none() {
+ if let Some(segment_path) = segment_paths.next() {
+ load_segment_tasks.spawn_blocking(load_segment_task(segment_path));
+ }
+ }
+ }
+ if let Some(err) = first_err {
+ return Err(err);
+ }Optional follow-up: wire a shared abort flag (e.g., Arc) into load_segment and set it when first_err is recorded to let in-flight loads exit early.
Committable suggestion skipped: line range outside the PR's diff.
* Limit concurrent calls to `load_segment` * Use StreamExt::buffer_unordered
Problem
The
load_segment()function is executed in parallel per segment.qdrant/lib/collection/src/shards/local_shard/mod.rs
Lines 335 to 338 in 8235a76
Inside
load_segment(), the migration to from RocksDB is performed (gated by a feature flag for now).qdrant/lib/segment/src/segment_constructor/segment_constructor_base.rs
Lines 751 to 761 in 8235a76
That means with 100 segments we'll get 100 parallel migrations once we enable these feature flags.
Solution
This PR implements the the following limit: