Conversation
📝 WalkthroughWalkthroughAdds a new SegmentRecord type and exports it. Introduces SegmentEntry::retrieve and implements it across segment and proxy layers. Reworks vector access into batched APIs (VectorStorage::read_vectors, storage-specific read_vectors, Segment::read_vectors, vectors_by_offsets) and updates search, segment ops, segment_holder, shard retrieve/update, and RecordInternal conversions to use batched retrieve/read flows and propagate an is_stopped stop flag. Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@lib/shard/src/retrieve/retrieve_blocking.rs`:
- Around line 31-63: The code updates point_version for every id in
newer_version_points before calling segment.retrieve(), which wrongly marks IDs
not actually returned as applied; change this by collecting the IDs returned by
segment.retrieve() (e.g., into a HashSet or Vec) and only update point_version
and increment applied for those returned IDs; specifically, in the closure
passed to segments_guard.read_points, keep the initial loop that builds
newer_version_points but do NOT call *version_entry.or_default() = version
there, call segment.retrieve(...) and collect each returned record.id into a set
while inserting records into point_records (using RecordInternal::from(record)),
then iterate over that returned-id set to update point_version entries and
increment applied for them (preserving the existing early "already latest" logic
that increments applied when Entry::Occupied and >= version).
In `@lib/shard/src/update.rs`:
- Around line 468-484: The missing-record detection builds missing_record_ids
from stored_records (so it's immediately emptied) — change it to start from the
set of requested IDs (the keys in id_to_point or the original requested IDs) and
then remove each stored_record.id during the stored_records loop; keep the
equality check (point.is_equal_to) and push differing points into
points_to_update as before, and after the loop iterate the remaining
missing_record_ids to push points for IDs not returned by the store; update the
variable initialization in sync_points (referencing missing_record_ids and
id_to_point) so missing IDs are computed from id_to_point.keys() rather than
stored_records.iter().
🧹 Nitpick comments (3)
lib/segment/src/vector_storage/dense/memmap_dense_vector_storage.rs (1)
191-205: UnusedAccessPatterngeneric parameter.The generic parameter
P: AccessPatternis declared but never used in this implementation. The method always delegates toread_vectors_asyncregardless of the access pattern. If this is intentional for the experimental async scroll feature, consider adding a comment explaining this behavior. Otherwise, the sequential/random access hints could potentially be used to optimize the async read strategy.Additionally, the
.unwrap()on line 204 will panic ifread_vectors_asyncfails. Based on learnings from this codebase, the io_uring feature is experimental and designed to panic rather than silently fall back. If this is the intended behavior here, a brief comment would clarify the design decision.lib/shard/src/proxy_segment/segment_entry.rs (1)
350-369: Avoid allocation when there are no deleted points.
You can skip buildingfiltered_point_idsifdeleted_pointsis empty to avoid an extra allocation and copy on the hot path.♻️ Suggested tweak
fn retrieve( &self, point_ids: &[PointIdType], with_payload: &WithPayload, with_vector: &WithVector, hw_counter: &HardwareCounterCell, is_stopped: &AtomicBool, ) -> OperationResult<Vec<SegmentRecord>> { + if self.deleted_points.is_empty() { + return self.wrapped_segment.get().read().retrieve( + point_ids, + with_payload, + with_vector, + hw_counter, + is_stopped, + ); + } let filtered_point_ids: Vec<PointIdType> = point_ids .iter() .copied() .filter(|id| !self.deleted_points.contains_key(id)) .collect(); self.wrapped_segment.get().read().retrieve(lib/segment/src/segment/vectors.rs (1)
20-44: Preserve first lookup error and avoid partial callbacks.Right now a lookup error still allows callbacks to run and overwrites earlier errors. If an error is returned, it’s safer to avoid partial side effects and to keep the first failure for determinism.
♻️ Suggested refactor
- let mut error = None; - let internal_ids = point_ids - .iter() - .copied() - .stop_if(is_stopped) - .filter_map(|point_id| match self.lookup_internal_id(point_id) { - Ok(point_offset) => Some(point_offset), - Err(err) => { - error = Some(err); - None - } - }); - self.vectors_by_offsets( - vector_names, - internal_ids, - hw_counter, - |point_offset, vector_internal| { - if let Some(point_id) = self.id_tracker.borrow().external_id(point_offset) { - callback(point_id, vector_internal); - } - }, - )?; - if let Some(err) = error { - return Err(err); - } + let mut error = None; + let mut internal_ids = Vec::with_capacity(point_ids.len()); + for point_id in point_ids.iter().copied().stop_if(is_stopped) { + match self.lookup_internal_id(point_id) { + Ok(point_offset) => internal_ids.push(point_offset), + Err(err) => { + error.get_or_insert(err); + break; // avoid partial callbacks when returning Err + } + } + } + if let Some(err) = error { + return Err(err); + } + self.vectors_by_offsets( + vector_names, + internal_ids, + hw_counter, + |point_offset, vector_internal| { + if let Some(point_id) = self.id_tracker.borrow().external_id(point_offset) { + callback(point_id, vector_internal); + } + }, + )?;
lib/shard/src/update.rs
Outdated
| let mut missing_record_ids: AHashSet<PointIdType> = | ||
| stored_records.iter().map(|record| record.id).collect(); | ||
|
|
||
| for stored_record in stored_records { | ||
| missing_record_ids.remove(&stored_record.id); | ||
| let point = id_to_point.get(&stored_record.id).unwrap(); | ||
| if !point.is_equal_to(&stored_record) { | ||
| points_to_update.push(*point); | ||
| Ok(true) | ||
| } else { | ||
| Ok(false) | ||
| updated += 1; | ||
| } | ||
| } | ||
| })?; | ||
|
|
||
| for missing_id in missing_record_ids { | ||
| let point = id_to_point.get(&missing_id).unwrap(); | ||
| points_to_update.push(*point); | ||
| updated += 1; | ||
| } |
There was a problem hiding this comment.
Fix missing-record detection in sync_points.
missing_record_ids is built from stored_records, then immediately emptied by the loop, so the “missing IDs” branch never runs. This skips updates for IDs that were requested but not returned.
🐛 Proposed fix
- let mut missing_record_ids: AHashSet<PointIdType> =
- stored_records.iter().map(|record| record.id).collect();
+ let mut missing_record_ids: AHashSet<PointIdType> =
+ ids.iter().copied().collect();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut missing_record_ids: AHashSet<PointIdType> = | |
| stored_records.iter().map(|record| record.id).collect(); | |
| for stored_record in stored_records { | |
| missing_record_ids.remove(&stored_record.id); | |
| let point = id_to_point.get(&stored_record.id).unwrap(); | |
| if !point.is_equal_to(&stored_record) { | |
| points_to_update.push(*point); | |
| Ok(true) | |
| } else { | |
| Ok(false) | |
| updated += 1; | |
| } | |
| } | |
| })?; | |
| for missing_id in missing_record_ids { | |
| let point = id_to_point.get(&missing_id).unwrap(); | |
| points_to_update.push(*point); | |
| updated += 1; | |
| } | |
| let mut missing_record_ids: AHashSet<PointIdType> = | |
| ids.iter().copied().collect(); | |
| for stored_record in stored_records { | |
| missing_record_ids.remove(&stored_record.id); | |
| let point = id_to_point.get(&stored_record.id).unwrap(); | |
| if !point.is_equal_to(&stored_record) { | |
| points_to_update.push(*point); | |
| updated += 1; | |
| } | |
| } | |
| for missing_id in missing_record_ids { | |
| let point = id_to_point.get(&missing_id).unwrap(); | |
| points_to_update.push(*point); | |
| updated += 1; | |
| } |
🤖 Prompt for AI Agents
In `@lib/shard/src/update.rs` around lines 468 - 484, The missing-record detection
builds missing_record_ids from stored_records (so it's immediately emptied) —
change it to start from the set of requested IDs (the keys in id_to_point or the
original requested IDs) and then remove each stored_record.id during the
stored_records loop; keep the equality check (point.is_equal_to) and push
differing points into points_to_update as before, and after the loop iterate the
remaining missing_record_ids to push points for IDs not returned by the store;
update the variable initialization in sync_points (referencing
missing_record_ids and id_to_point) so missing IDs are computed from
id_to_point.keys() rather than stored_records.iter().
| pub struct SegmentRecord { | ||
| pub id: PointIdType, | ||
| pub vectors: Option<NamedVectorsOwned>, | ||
| pub payload: Option<Payload>, | ||
| } |
There was a problem hiding this comment.
inctead of exposing vectors and payloads functions separatelly, segment now expose retrieve fucntion, which returns this.
Reason for this is that we can't have impl .. with dyn SegmentEntry so it is hard to expose callback function in this trait
| fn retrieve( | ||
| &self, | ||
| point_ids: &[PointIdType], | ||
| with_payload: &WithPayload, | ||
| with_vector: &WithVector, | ||
| hw_counter: &HardwareCounterCell, | ||
| is_stopped: &AtomicBool, | ||
| ) -> OperationResult<Vec<SegmentRecord>>; |
There was a problem hiding this comment.
This function, actually, the only thing we need, vectors all_vectors and payloads are now only used in a few not-so-important places, which can be refactored out later
| let point_id = id_tracker.external_id(point_offset); | ||
| // This can happen if point was modified between retrieving and post-processing | ||
| // But this function locks the segment, so it can't be modified during its execution | ||
| debug_assert!( | ||
| point_id.is_some(), | ||
| "Point with internal ID {point_offset} not found in id tracker" | ||
| ); | ||
| point_id.map(|id| (id, scored_point_offset)) |
There was a problem hiding this comment.
Handling of unexpected situations changed a bit in this function, mostly because it is a bit harder to return error from batch of points instead of single one.
But I think it is fine, as we don't want to crash production if some edge-case point is broken.
| let mut result = None; | ||
| self.vectors_by_offsets( | ||
| vector_name, | ||
| std::iter::once(point_offset), | ||
| hw_counter, | ||
| |_, vector_internal| { | ||
| result = Some(vector_internal); | ||
| }, | ||
| )?; | ||
| Ok(result) | ||
| } |
There was a problem hiding this comment.
this is changed to increase test coverage of underlying vectors_by_offsets
| use crate::segment::Segment; | ||
| use crate::types::{PointIdType, VectorName}; | ||
|
|
||
| impl Segment { |
There was a problem hiding this comment.
more vector-related function can be moved into this file later
| named_vectors | ||
| } | ||
|
|
||
| pub fn is_equal_to(&self, segment_record: &SegmentRecord) -> bool { |
There was a problem hiding this comment.
this function if used for deduplication during sync calls
| Self { | ||
| id, | ||
| payload, | ||
| vector: vectors.map(VectorStructInternal::from), |
There was a problem hiding this comment.
this is used to handle API expectation of points with no vector be displayed as vector: {}.
|
Nice. The change looks significant, assuming I parse the graphs correctly. I did start an effort a few weeks back for concurrent reading+transferring. I think this approach is better. Though I might implement the same concurrentness on top of this in a separate PR. It'd eliminate waiting on roundtrip time. |
agourlay
left a comment
There was a problem hiding this comment.
Struggled a bit to review the change across all files.
I did validate the integration tests locally with QDRANT__STORAGE__PERFORMANCE__ASYNC_SCORER=true
Also tried some basic benchmarking of scroll with_vectors but could not see a difference locally probably because of my fast SSD.
* implementation of async batch vectors reading * EXPERIMENT: async-io for reading on scroll * disable on non-linux * make retrieve sequential for test * wip: implement vector reading via callback * simplify operations, remove duplicates * use batch retrieve also for post-processing search results * clippy * fix tests * review fixes * Replace big match statement with simple option filter and equal check * Inline format arguments --------- Co-authored-by: timvisee <tim@visee.me>


This is an experimental PR aimed to test a hypothesis of slow reads during shard transfers being caused by sequential reads.
This RP changes approach of how we read vectors for scroll operation: instead of reading vectors one-by-one sequntially, it reads batch of vectors. Each batch is processed with
io_uringif enabled.Concurrent reads of vectors significancly decrease time to transfer shards, especially when receiving side is not empty (see charts below)