Skip to content

Limit concurrent calls to load_segment#7233

Merged
timvisee merged 2 commits intodevfrom
concurrent_load_segment
Sep 11, 2025
Merged

Limit concurrent calls to load_segment#7233
timvisee merged 2 commits intodevfrom
concurrent_load_segment

Conversation

@xzfc
Copy link
Member

@xzfc xzfc commented Sep 9, 2025

Problem

The load_segment() function is executed in parallel per segment.

load_handlers.push(tokio::task::spawn_blocking(move || {
// let _guard = semaphore_clone.lock();
let segment = load_segment(&segment_path, &AtomicBool::new(false))?;

Inside load_segment(), the migration to from RocksDB is performed (gated by a feature flag for now).

#[cfg(feature = "rocksdb")]
{
if common::flags::feature_flags().migrate_rocksdb_vector_storage {
migrate_all_rocksdb_dense_vector_storages(path, &mut segment, &mut segment_state)?;
migrate_all_rocksdb_sparse_vector_storages(path, &mut segment, &mut segment_state)?;
}
if common::flags::feature_flags().migrate_rocksdb_payload_storage {
migrate_rocksdb_payload_storage(path, &mut segment, &mut segment_state)?;
}
}

That means with 100 segments we'll get 100 parallel migrations once we enable these feature flags.

Solution

This PR implements the the following limit:

/// Maximum number of segments to load concurrently when loading a collection.
pub const MAX_CONCURRENT_SEGMENT_LOADS: usize = 8;

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
lib/collection/src/shards/local_shard/mod.rs (2)

301-327: Make segment scheduling deterministic (sort paths) and pre-collect

read_dir order is OS-dependent; sorting improves reproducibility and test stability.

Apply within this hunk:

-        let mut segment_paths = segment_paths
-            .into_iter()
+        let mut segment_paths: Vec<PathBuf> = segment_paths
+            .into_iter()
             .filter(|entry| {
@@
-            })
-            .map(|entry| entry.path());
+            })
+            .map(|entry| entry.path())
+            .collect();
+        // Deterministic ordering to improve reproducibility and testability
+        segment_paths.sort_unstable();
+        // Turn into an iterator for the scheduling loop below
+        let mut segment_paths = segment_paths.into_iter();

328-353: Add path-context to errors from load/repair/index rebuild; simplify Ok

Gives actionable diagnostics per segment and avoids relying on upstream context; also simplify CollectionResult::Ok to Ok.

-        let load_segment_task = |segment_path: PathBuf| {
-            let payload_index_schema = payload_index_schema.clone();
-            move || {
-                let segment = load_segment(&segment_path, &AtomicBool::new(false))?;
+        let load_segment_task = |segment_path: PathBuf| {
+            let payload_index_schema = payload_index_schema.clone();
+            move || {
+                let segment = load_segment(&segment_path, &AtomicBool::new(false)).map_err(|err| {
+                    CollectionError::service_error(format!(
+                        "failed to load segment {}: {err}",
+                        segment_path.display()
+                    ))
+                })?;
@@
-                segment.check_consistency_and_repair()?;
+                segment.check_consistency_and_repair().map_err(|err| {
+                    CollectionError::service_error(format!(
+                        "failed consistency repair for {}: {err}",
+                        segment_path.display()
+                    ))
+                })?;
@@
-                if rebuild_payload_index {
-                    segment
-                        .update_all_field_indices(&payload_index_schema.read().schema.clone())?;
-                }
+                if rebuild_payload_index {
+                    segment
+                        .update_all_field_indices(&payload_index_schema.read().schema.clone())
+                        .map_err(|err| {
+                            CollectionError::service_error(format!(
+                                "failed to rebuild payload indices for {}: {err}",
+                                segment_path.display()
+                            ))
+                        })?;
+                }
@@
-                CollectionResult::Ok(Some(segment))
+                Ok(Some(segment))
             }
         };
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c05df46 and 514d5d3.

📒 Files selected for processing (2)
  • lib/collection/src/shards/local_shard/mod.rs (5 hunks)
  • lib/common/common/src/defaults.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs

📄 CodeRabbit inference engine (.github/review-rules.md)

**/*.rs: Prefer explicit SomeType::from(x) over implicit x.into() in Rust code
Do not use transmute_from_u8, transmute_to_u8, transmute_from_u8_to_slice, transmute_from_u8_to_mut_slice, transmute_to_u8_slice in new code; use bytemuck or zerocopy instead

Files:

  • lib/common/common/src/defaults.rs
  • lib/collection/src/shards/local_shard/mod.rs
**/src/**/*.rs

📄 CodeRabbit inference engine (.github/review-rules.md)

**/src/**/*.rs: Prefer exhaustive match arms over a catch-all _ arm to avoid missing new enum variants (except in tests/benchmarks or when provably safe)
Prefer explicit field ignoring with : _ over .. in struct patterns (except in tests/benchmarks or when provably safe)

Files:

  • lib/common/common/src/defaults.rs
  • lib/collection/src/shards/local_shard/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: rust-tests (macos-latest)
  • GitHub Check: rust-tests-no-rocksdb (ubuntu-latest)
  • GitHub Check: rust-tests (windows-latest)
  • GitHub Check: rust-tests (ubuntu-latest)
  • GitHub Check: storage-compat-test
  • GitHub Check: lint
  • GitHub Check: test-consensus-compose
  • GitHub Check: test-consistency
  • GitHub Check: e2e-tests
  • GitHub Check: test-shard-snapshot-api-s3-minio
  • GitHub Check: integration-tests-consensus
  • GitHub Check: integration-tests
🔇 Additional comments (4)
lib/common/common/src/defaults.rs (1)

16-18: Concurrency cap constant looks good

Clear doc, sensible default, and scoped in a shared defaults module.

lib/collection/src/shards/local_shard/mod.rs (3)

24-24: Import of MAX_CONCURRENT_SEGMENT_LOADS is correct

Matches the new shared default; keeps the cap centralized.


47-47: JoinSet import is appropriate for bounded fan-out

Right tool for managing the concurrent blocking tasks.


355-358: Initial bounded scheduling is correct

Spawns up to MAX_CONCURRENT_SEGMENT_LOADS; good use of spawn_blocking for I/O-heavy work.

Comment on lines 362 to 389
while let Some(result) = load_segment_tasks.join_next().await {
let segment = result??;

if let Some(segment) = segment {
collection_config_read
.params
.vectors
.check_compatible_with_segment_config(&segment.config().vector_data, true)?;
collection_config_read
.params
.sparse_vectors
.as_ref()
.map(|sparse_vectors| {
check_sparse_compatible_with_segment_config(
sparse_vectors,
&segment.config().sparse_vector_data,
true,
)
})
.unwrap_or(Ok(()))?;

segment_holder.add_new(segment);
}

segment_holder.add_new(segment);
if let Some(segment_path) = segment_paths.next() {
load_segment_tasks.spawn_blocking(load_segment_task(segment_path));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not early-return on first error; drain JoinSet and don’t spawn new tasks after failure

Using result?? can:

  • Rely on an implicit From -> CollectionError (often absent) and drop error context.
  • Return early and drop JoinSet. For spawn_blocking, aborting the JoinHandle does not stop the underlying blocking work; those tasks keep running in the background, causing load/migration churn after the shard load has failed.

Handle JoinError explicitly, capture the first error, stop spawning new tasks, drain the set, then return the error.

-        while let Some(result) = load_segment_tasks.join_next().await {
-            let segment = result??;
-
-            if let Some(segment) = segment {
-                collection_config_read
-                    .params
-                    .vectors
-                    .check_compatible_with_segment_config(&segment.config().vector_data, true)?;
-                collection_config_read
-                    .params
-                    .sparse_vectors
-                    .as_ref()
-                    .map(|sparse_vectors| {
-                        check_sparse_compatible_with_segment_config(
-                            sparse_vectors,
-                            &segment.config().sparse_vector_data,
-                            true,
-                        )
-                    })
-                    .unwrap_or(Ok(()))?;
-
-                segment_holder.add_new(segment);
-            }
-
-            if let Some(segment_path) = segment_paths.next() {
-                load_segment_tasks.spawn_blocking(load_segment_task(segment_path));
-            }
-        }
+        let mut first_err: Option<CollectionError> = None;
+        while let Some(result) = load_segment_tasks.join_next().await {
+            match result {
+                Err(join_err) => {
+                    let err = CollectionError::service_error(format!(
+                        "failed to join segment load task: {join_err}"
+                    ));
+                    if first_err.is_none() {
+                        first_err = Some(err);
+                    }
+                }
+                Ok(Err(load_err)) => {
+                    if first_err.is_none() {
+                        first_err = Some(load_err);
+                    }
+                }
+                Ok(Ok(Some(segment))) => {
+                    collection_config_read
+                        .params
+                        .vectors
+                        .check_compatible_with_segment_config(&segment.config().vector_data, true)?;
+                    collection_config_read
+                        .params
+                        .sparse_vectors
+                        .as_ref()
+                        .map(|sparse_vectors| {
+                            check_sparse_compatible_with_segment_config(
+                                sparse_vectors,
+                                &segment.config().sparse_vector_data,
+                                true,
+                            )
+                        })
+                        .unwrap_or(Ok(()))?;
+                    segment_holder.add_new(segment);
+                }
+                Ok(Ok(None)) => {}
+            }
+
+            // Maintain concurrency only if no error has been observed.
+            if first_err.is_none() {
+                if let Some(segment_path) = segment_paths.next() {
+                    load_segment_tasks.spawn_blocking(load_segment_task(segment_path));
+                }
+            }
+        }
+        if let Some(err) = first_err {
+            return Err(err);
+        }

Optional follow-up: wire a shared abort flag (e.g., Arc) into load_segment and set it when first_err is recorded to let in-flight loads exit early.

Committable suggestion skipped: line range outside the PR's diff.

coderabbitai[bot]

This comment was marked as resolved.

@qdrant qdrant deleted a comment from coderabbitai bot Sep 11, 2025
Copy link
Member

@timvisee timvisee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works great. Thanks!

@timvisee timvisee merged commit 3c9e36f into dev Sep 11, 2025
25 of 28 checks passed
@timvisee timvisee deleted the concurrent_load_segment branch September 11, 2025 11:57
timvisee pushed a commit that referenced this pull request Sep 29, 2025
* Limit concurrent calls to `load_segment`

* Use StreamExt::buffer_unordered
@timvisee timvisee mentioned this pull request Sep 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants