Skip to content

async scroll#7928

Merged
generall merged 12 commits intodevfrom
async-scroll
Jan 19, 2026
Merged

async scroll#7928
generall merged 12 commits intodevfrom
async-scroll

Conversation

@generall
Copy link
Member

@generall generall commented Jan 16, 2026

This is an experimental PR aimed to test a hypothesis of slow reads during shard transfers being caused by sequential reads.

This RP changes approach of how we read vectors for scroll operation: instead of reading vectors one-by-one sequntially, it reads batch of vectors. Each batch is processed with io_uring if enabled.

Concurrent reads of vectors significancly decrease time to transfer shards, especially when receiving side is not empty (see charts below)

@generall
Copy link
Member Author

left to right: async_io transfer, regular transfer, transfer with madvise=sequential

image

@generall
Copy link
Member Author

Transfer under higher memory preasure + UUID instead of sequential.

image

Left: sequential reads
Right: async IO

@generall generall marked this pull request as ready for review January 17, 2026 21:49
@generall generall requested review from agourlay and timvisee January 17, 2026 21:49
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 17, 2026

📝 Walkthrough

Walkthrough

Adds a new SegmentRecord type and exports it. Introduces SegmentEntry::retrieve and implements it across segment and proxy layers. Reworks vector access into batched APIs (VectorStorage::read_vectors, storage-specific read_vectors, Segment::read_vectors, vectors_by_offsets) and updates search, segment ops, segment_holder, shard retrieve/update, and RecordInternal conversions to use batched retrieve/read flows and propagate an is_stopped stop flag.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • agourlay
🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'async scroll' is vague and uses a non-descriptive term that doesn't clearly convey the main change to the changeset. Consider using a more descriptive title such as 'Implement batched async vector reads for scroll operations' or 'Change vector retrieval to use batch I/O for scroll operations'.
✅ Passed checks (2 passed)
Check name Status Explanation
Description check ✅ Passed The description relates to the changeset by explaining the motivations and approach for batched vector reads during shard transfers, which aligns with the technical changes in the PR.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@lib/shard/src/retrieve/retrieve_blocking.rs`:
- Around line 31-63: The code updates point_version for every id in
newer_version_points before calling segment.retrieve(), which wrongly marks IDs
not actually returned as applied; change this by collecting the IDs returned by
segment.retrieve() (e.g., into a HashSet or Vec) and only update point_version
and increment applied for those returned IDs; specifically, in the closure
passed to segments_guard.read_points, keep the initial loop that builds
newer_version_points but do NOT call *version_entry.or_default() = version
there, call segment.retrieve(...) and collect each returned record.id into a set
while inserting records into point_records (using RecordInternal::from(record)),
then iterate over that returned-id set to update point_version entries and
increment applied for them (preserving the existing early "already latest" logic
that increments applied when Entry::Occupied and >= version).

In `@lib/shard/src/update.rs`:
- Around line 468-484: The missing-record detection builds missing_record_ids
from stored_records (so it's immediately emptied) — change it to start from the
set of requested IDs (the keys in id_to_point or the original requested IDs) and
then remove each stored_record.id during the stored_records loop; keep the
equality check (point.is_equal_to) and push differing points into
points_to_update as before, and after the loop iterate the remaining
missing_record_ids to push points for IDs not returned by the store; update the
variable initialization in sync_points (referencing missing_record_ids and
id_to_point) so missing IDs are computed from id_to_point.keys() rather than
stored_records.iter().
🧹 Nitpick comments (3)
lib/segment/src/vector_storage/dense/memmap_dense_vector_storage.rs (1)

191-205: Unused AccessPattern generic parameter.

The generic parameter P: AccessPattern is declared but never used in this implementation. The method always delegates to read_vectors_async regardless of the access pattern. If this is intentional for the experimental async scroll feature, consider adding a comment explaining this behavior. Otherwise, the sequential/random access hints could potentially be used to optimize the async read strategy.

Additionally, the .unwrap() on line 204 will panic if read_vectors_async fails. Based on learnings from this codebase, the io_uring feature is experimental and designed to panic rather than silently fall back. If this is the intended behavior here, a brief comment would clarify the design decision.

lib/shard/src/proxy_segment/segment_entry.rs (1)

350-369: Avoid allocation when there are no deleted points.
You can skip building filtered_point_ids if deleted_points is empty to avoid an extra allocation and copy on the hot path.

♻️ Suggested tweak
     fn retrieve(
         &self,
         point_ids: &[PointIdType],
         with_payload: &WithPayload,
         with_vector: &WithVector,
         hw_counter: &HardwareCounterCell,
         is_stopped: &AtomicBool,
     ) -> OperationResult<Vec<SegmentRecord>> {
+        if self.deleted_points.is_empty() {
+            return self.wrapped_segment.get().read().retrieve(
+                point_ids,
+                with_payload,
+                with_vector,
+                hw_counter,
+                is_stopped,
+            );
+        }
         let filtered_point_ids: Vec<PointIdType> = point_ids
             .iter()
             .copied()
             .filter(|id| !self.deleted_points.contains_key(id))
             .collect();
         self.wrapped_segment.get().read().retrieve(
lib/segment/src/segment/vectors.rs (1)

20-44: Preserve first lookup error and avoid partial callbacks.

Right now a lookup error still allows callbacks to run and overwrites earlier errors. If an error is returned, it’s safer to avoid partial side effects and to keep the first failure for determinism.

♻️ Suggested refactor
-        let mut error = None;
-        let internal_ids = point_ids
-            .iter()
-            .copied()
-            .stop_if(is_stopped)
-            .filter_map(|point_id| match self.lookup_internal_id(point_id) {
-                Ok(point_offset) => Some(point_offset),
-                Err(err) => {
-                    error = Some(err);
-                    None
-                }
-            });
-        self.vectors_by_offsets(
-            vector_names,
-            internal_ids,
-            hw_counter,
-            |point_offset, vector_internal| {
-                if let Some(point_id) = self.id_tracker.borrow().external_id(point_offset) {
-                    callback(point_id, vector_internal);
-                }
-            },
-        )?;
-        if let Some(err) = error {
-            return Err(err);
-        }
+        let mut error = None;
+        let mut internal_ids = Vec::with_capacity(point_ids.len());
+        for point_id in point_ids.iter().copied().stop_if(is_stopped) {
+            match self.lookup_internal_id(point_id) {
+                Ok(point_offset) => internal_ids.push(point_offset),
+                Err(err) => {
+                    error.get_or_insert(err);
+                    break; // avoid partial callbacks when returning Err
+                }
+            }
+        }
+        if let Some(err) = error {
+            return Err(err);
+        }
+        self.vectors_by_offsets(
+            vector_names,
+            internal_ids,
+            hw_counter,
+            |point_offset, vector_internal| {
+                if let Some(point_id) = self.id_tracker.borrow().external_id(point_offset) {
+                    callback(point_id, vector_internal);
+                }
+            },
+        )?;

Comment on lines +468 to +484
let mut missing_record_ids: AHashSet<PointIdType> =
stored_records.iter().map(|record| record.id).collect();

for stored_record in stored_records {
missing_record_ids.remove(&stored_record.id);
let point = id_to_point.get(&stored_record.id).unwrap();
if !point.is_equal_to(&stored_record) {
points_to_update.push(*point);
Ok(true)
} else {
Ok(false)
updated += 1;
}
}
})?;

for missing_id in missing_record_ids {
let point = id_to_point.get(&missing_id).unwrap();
points_to_update.push(*point);
updated += 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix missing-record detection in sync_points.

missing_record_ids is built from stored_records, then immediately emptied by the loop, so the “missing IDs” branch never runs. This skips updates for IDs that were requested but not returned.

🐛 Proposed fix
-            let mut missing_record_ids: AHashSet<PointIdType> =
-                stored_records.iter().map(|record| record.id).collect();
+            let mut missing_record_ids: AHashSet<PointIdType> =
+                ids.iter().copied().collect();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut missing_record_ids: AHashSet<PointIdType> =
stored_records.iter().map(|record| record.id).collect();
for stored_record in stored_records {
missing_record_ids.remove(&stored_record.id);
let point = id_to_point.get(&stored_record.id).unwrap();
if !point.is_equal_to(&stored_record) {
points_to_update.push(*point);
Ok(true)
} else {
Ok(false)
updated += 1;
}
}
})?;
for missing_id in missing_record_ids {
let point = id_to_point.get(&missing_id).unwrap();
points_to_update.push(*point);
updated += 1;
}
let mut missing_record_ids: AHashSet<PointIdType> =
ids.iter().copied().collect();
for stored_record in stored_records {
missing_record_ids.remove(&stored_record.id);
let point = id_to_point.get(&stored_record.id).unwrap();
if !point.is_equal_to(&stored_record) {
points_to_update.push(*point);
updated += 1;
}
}
for missing_id in missing_record_ids {
let point = id_to_point.get(&missing_id).unwrap();
points_to_update.push(*point);
updated += 1;
}
🤖 Prompt for AI Agents
In `@lib/shard/src/update.rs` around lines 468 - 484, The missing-record detection
builds missing_record_ids from stored_records (so it's immediately emptied) —
change it to start from the set of requested IDs (the keys in id_to_point or the
original requested IDs) and then remove each stored_record.id during the
stored_records loop; keep the equality check (point.is_equal_to) and push
differing points into points_to_update as before, and after the loop iterate the
remaining missing_record_ids to push points for IDs not returned by the store;
update the variable initialization in sync_points (referencing
missing_record_ids and id_to_point) so missing IDs are computed from
id_to_point.keys() rather than stored_records.iter().

Comment on lines +6 to +10
pub struct SegmentRecord {
pub id: PointIdType,
pub vectors: Option<NamedVectorsOwned>,
pub payload: Option<Payload>,
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inctead of exposing vectors and payloads functions separatelly, segment now expose retrieve fucntion, which returns this.

Reason for this is that we can't have impl .. with dyn SegmentEntry so it is hard to expose callback function in this trait

Comment on lines +148 to +155
fn retrieve(
&self,
point_ids: &[PointIdType],
with_payload: &WithPayload,
with_vector: &WithVector,
hw_counter: &HardwareCounterCell,
is_stopped: &AtomicBool,
) -> OperationResult<Vec<SegmentRecord>>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function, actually, the only thing we need, vectors all_vectors and payloads are now only used in a few not-so-important places, which can be refactored out later

Comment on lines +37 to +44
let point_id = id_tracker.external_id(point_offset);
// This can happen if point was modified between retrieving and post-processing
// But this function locks the segment, so it can't be modified during its execution
debug_assert!(
point_id.is_some(),
"Point with internal ID {point_offset} not found in id tracker"
);
point_id.map(|id| (id, scored_point_offset))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling of unexpected situations changed a bit in this function, mostly because it is a bit harder to return error from batch of points instead of single one.

But I think it is fine, as we don't want to crash production if some edge-case point is broken.

Comment on lines +387 to +397
let mut result = None;
self.vectors_by_offsets(
vector_name,
std::iter::once(point_offset),
hw_counter,
|_, vector_internal| {
result = Some(vector_internal);
},
)?;
Ok(result)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is changed to increase test coverage of underlying vectors_by_offsets

use crate::segment::Segment;
use crate::types::{PointIdType, VectorName};

impl Segment {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more vector-related function can be moved into this file later

named_vectors
}

pub fn is_equal_to(&self, segment_record: &SegmentRecord) -> bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function if used for deduplication during sync calls

Self {
id,
payload,
vector: vectors.map(VectorStructInternal::from),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used to handle API expectation of points with no vector be displayed as vector: {}.

@timvisee
Copy link
Member

Nice. The change looks significant, assuming I parse the graphs correctly.

I did start an effort a few weeks back for concurrent reading+transferring. I think this approach is better. Though I might implement the same concurrentness on top of this in a separate PR. It'd eliminate waiting on roundtrip time.

Copy link
Member

@agourlay agourlay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Struggled a bit to review the change across all files.

I did validate the integration tests locally with QDRANT__STORAGE__PERFORMANCE__ASYNC_SCORER=true

Also tried some basic benchmarking of scroll with_vectors but could not see a difference locally probably because of my fast SSD.

@generall generall merged commit 21deeec into dev Jan 19, 2026
15 checks passed
@generall generall deleted the async-scroll branch January 19, 2026 17:43
generall added a commit that referenced this pull request Feb 9, 2026
* implementation of async batch vectors reading

* EXPERIMENT: async-io for reading on scroll

* disable on non-linux

* make retrieve sequential for test

* wip: implement vector reading via callback

* simplify operations, remove duplicates

* use batch retrieve also for post-processing search results

* clippy

* fix tests

* review fixes

* Replace big match statement with simple option filter and equal check

* Inline format arguments

---------

Co-authored-by: timvisee <tim@visee.me>
@timvisee timvisee mentioned this pull request Feb 17, 2026
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants