Skip to content

Persist applied seq at regular interval#7976

Merged
agourlay merged 17 commits intodevfrom
save-applied-seq
Jan 28, 2026
Merged

Persist applied seq at regular interval#7976
agourlay merged 17 commits intodevfrom
save-applied-seq

Conversation

@agourlay
Copy link
Member

@agourlay agourlay commented Jan 23, 2026

In the context of the new upcoming update queue, we need to have a clear view on which updates are applied but not yet flushed.

Concretely, the blue elements in the WAL diagram below.

image

Refer to design doc for more information https://www.notion.so/qdrant/Update-queue-2ee674779d3380b18636dd858ceb9099

To that end this PR adds a tracking of the latest applied seq on a local shard basis.

I have decided to use a JSON object format to give us the opportunity to extend if necessary in the future.

Saving this information on disk requires strong durability, therefore in order to amortize the cost of fsync we perform it only each 64 updates.

Preliminary benchmark results show that this does not impact the performance of updates.

This value is currently only written and not used for anything.

Future work

Use the last applied_seq during restart.

@agourlay agourlay force-pushed the save-applied-seq branch 3 times, most recently from 0d8a486 to b6a9a36 Compare January 26, 2026 09:51
@agourlay agourlay marked this pull request as ready for review January 26, 2026 09:59
@agourlay agourlay requested review from IvanPleshkov, generall and timvisee and removed request for generall January 26, 2026 09:59
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 26, 2026

📝 Walkthrough

Walkthrough

Adds per-shard applied-sequence tracking and persistence. Introduces update_workers::applied_seq::AppliedSeqHandler (in-memory AtomicU64 op_num, intervaled SaveOnDisk persistence, load_or_init, update, path accessor, and tests). Wires AppliedSeqHandler into UpdateHandler (new async constructor, stored Arc, pub applied_seq()), UpdateWorkers (update and optimization flows call update), LocalShard initialization, shard file utilities, move/clear flows, and snapshot creation/merge so applied_seq.json is tracked, moved, and archived alongside existing shard data.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • agourlay
  • ffuugoo
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main change: introducing persistence of the applied sequence at regular intervals, which is the core functionality added across all modified files.
Description check ✅ Passed The description is well-related to the changeset, explaining the motivation, design decisions (JSON format, 64-update intervals), performance considerations, and future work plans.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@lib/collection/src/update_workers/applied_seq.rs`:
- Around line 63-74: The debug assertion in save (applied_seq.rs -> fn save,
inside self.file.write closure) can panic on clock skew; replace
debug_assert!(current.timestamp <= timestamp) with a runtime check that if
current.timestamp > timestamp logs a warning (e.g., via log::warn! or the
project logger) including both values and avoids lowering current.timestamp
(i.e., keep current.timestamp as-is); otherwise proceed to set current.op_num =
op_num and current.timestamp = timestamp. Ensure the warning message is
descriptive and include the symbol names current.timestamp and timestamp so it's
easy to debug.
🧹 Nitpick comments (1)
lib/collection/src/update_workers/applied_seq.rs (1)

78-88: Minor race window in interval-based save.

Between fetch_add and store(0), concurrent calls can both exceed the threshold and trigger multiple saves. This is benign (extra fsyncs, no data loss) but could be tightened with compare_exchange if strict interval adherence becomes important.

Alternative using compare_exchange
     pub fn update(&self, op_num: u64) -> CollectionResult<()> {
         // update in-memory
         self.op_num.store(op_num, Ordering::Relaxed);
         let prev_count = self.update_count.fetch_add(1, Ordering::Relaxed);
         // update on disk according to interval to amortize fsync
-        if prev_count + 1 >= APPLIED_SEQ_SAVE_INTERVAL {
+        let new_count = prev_count + 1;
+        if new_count >= APPLIED_SEQ_SAVE_INTERVAL
+            && self
+                .update_count
+                .compare_exchange(new_count, 0, Ordering::Relaxed, Ordering::Relaxed)
+                .is_ok()
+        {
             self.save(op_num)?;
-            self.update_count.store(0, Ordering::Relaxed);
         }
         Ok(())
     }

Copy link
Member

@timvisee timvisee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tests too!

@agourlay
Copy link
Member Author

I have pushed a significant change to handle the case of failing to load.
Please review again 🙏

update_tracker: UpdateTracker,
) -> UpdateHandler {
) -> Self {
let wal_first_index = wal.lock().await.first_index();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to make the function async to lock the WAL because the blocking_lock caused

Panic occurred in file lib/collection/src/update_handler.rs at line 158: Cannot block the current thread from within a runtime. This happens because a function attempted to block the current thread while the thread is being used to drive asynchronous tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this function is called after replay and wal truncating, isn't it?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@lib/collection/src/update_workers/applied_seq.rs`:
- Around line 62-117: The debug assertion in load_or_init currently compares
existing_op_num to wal_first_index allowing an off-by-one; change the
debug_assert! in load_or_init to compare existing_op_num against
last_applied_wal_index (the variable defined as
wal_first_index.saturating_sub(1)) and update the assertion message to reference
last_applied_wal_index so the condition enforces existing_op_num <=
last_applied_wal_index (use the same variable names: existing_op_num,
last_applied_wal_index, and debug_assert).

@agourlay
Copy link
Member Author

Looks like I broke the integration tests somehow 🕵️

@agourlay
Copy link
Member Author

It seems my debug assertion is incorrect when performing transfers.

ERROR qdrant::startup: Panic occurred in file lib/collection/src/update_workers/applied_seq.rs at line 76: persisted applied_seq:65 cannot be larger than the last_applied_wal_index:0

@IvanPleshkov IvanPleshkov self-requested a review January 27, 2026 11:48
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@lib/collection/src/update_workers/applied_seq.rs`:
- Around line 88-114: The current Err branch uses fs::remove_file(...) followed
by a recursive Self::load_or_init(...) which can recurse indefinitely on
persistent FS failures; change this to limit retries (e.g., perform at most one
non-recursive retry or convert to a small loop with a retry count) instead of a
direct recursive call. Specifically, in the branch that checks
file_was_already_present, after a successful fs::remove_file(&path) do one
controlled retry to initialize (call the same initialization logic used by
Self::load_or_init but without recursion) or loop with a max_attempts counter,
and if it still fails return the fallback Self { file: None, path, op_num:
AtomicU64::new(wal_last_index), update_count } rather than calling
Self::load_or_init(...) again. Ensure you update references to
Self::load_or_init, fs::remove_file, and the file_was_already_present branch
accordingly.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@lib/collection/src/shards/local_shard/mod.rs`:
- Around line 268-271: When initializing AppliedSeqHandler, avoid panicking if
the WAL's last_index is lower than the persisted applied_seq (applied_seq.json).
Either: (1) before calling AppliedSeqHandler::load_or_init, read the persisted
applied sequence from applied_seq.json at shard_path and set wal_last_index =
std::cmp::max(wal_last_index, persisted_applied_seq) so load_or_init never sees
a lower value; or (2) relax validation inside AppliedSeqHandler::load_or_init
(or add an option) to accept wal_last_index < persisted_applied_seq by clamping
internal state to the persisted value and emitting a debug/warn log. Update the
call site around locked_wal.lock().await.last_index() and the
AppliedSeqHandler::load_or_init implementation accordingly.

In `@lib/collection/src/shards/local_shard/snapshot.rs`:
- Around line 188-200: The TOCTOU in snapshot_applied_seq
(applied_seq_path.exists() followed by tar.blocking_append_file) can cause flaky
failures when the file is renamed/removed concurrently; change the logic to
attempt blocking_append_file unconditionally and treat an
io::ErrorKind::NotFound from tar.blocking_append_file as non-fatal (return
Ok(())), while still mapping all other errors to CollectionError::service_error;
reference snapshot_applied_seq, applied_seq_path, APPLIED_SEQ_FILE, and
blocking_append_file when implementing this conditional error handling.

Comment on lines +188 to +200
/// snapshot the applied_seq file
fn snapshot_applied_seq(
applied_seq_path: PathBuf,
tar: &tar_ext::BuilderExt,
) -> CollectionResult<()> {
if applied_seq_path.exists() {
tar.blocking_append_file(applied_seq_path.as_path(), Path::new(APPLIED_SEQ_FILE))
.map_err(|err| {
CollectionError::service_error(format!(
"Error while archiving applied_seq: {err}"
))
})?;
}
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n lib/collection/src/shards/local_shard/snapshot.rs | sed -n '185,205p'

Repository: qdrant/qdrant

Length of output: 826


🏁 Script executed:

rg -A 5 -B 5 "SaveOnDisk" lib/collection/src/ | head -100

Repository: qdrant/qdrant

Length of output: 7303


🏁 Script executed:

rg "applied_seq" lib/collection/src/shards/local_shard/ -A 2 -B 2

Repository: qdrant/qdrant

Length of output: 6290


🏁 Script executed:

find . -type f -name "*.rs" | xargs grep -l "AppliedSeqHandler" | head -5

Repository: qdrant/qdrant

Length of output: 246


🏁 Script executed:

cat -n lib/collection/src/update_workers/applied_seq.rs | head -150

Repository: qdrant/qdrant

Length of output: 6075


🏁 Script executed:

rg "SaveOnDisk" common/ -A 10 -B 2 | head -200

Repository: qdrant/qdrant

Length of output: 104


🏁 Script executed:

find . -type f -name "*.rs" -path "*/common/*" | xargs grep -l "SaveOnDisk" 2>/dev/null | head -5

Repository: qdrant/qdrant

Length of output: 96


🏁 Script executed:

find . -type f -name "*.rs" | xargs grep -l "pub struct SaveOnDisk" | head -3

Repository: qdrant/qdrant

Length of output: 96


🏁 Script executed:

cat -n lib/common/common/src/save_on_disk.rs | head -200

Repository: qdrant/qdrant

Length of output: 8031


🏁 Script executed:

rg "atomicwrites" lib/common/common/Cargo.toml -A 2

Repository: qdrant/qdrant

Length of output: 186


🏁 Script executed:

rg "NotFound\|ErrorKind::NotFound" lib/collection/src/shards/local_shard/ -B 2 -A 2

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

rg -A 10 "snapshot.*applied_seq\|restore.*applied_seq" lib/collection/src/shards/local_shard/

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Let's check atomicwrites crate behavior and understand the atomic operation better
rg "AtomicFile" lib/common/common/src/save_on_disk.rs -B 2 -A 5

Repository: qdrant/qdrant

Length of output: 729


🏁 Script executed:

# Check if there are any existing patterns handling NotFound in tar operations
rg "blocking_append_file\|append_file" lib/collection/src/ -B 3 -A 3

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Let's verify if applied_seq can actually be absent/optional
rg "APPLIED_SEQ_FILE" lib/collection/src/ -B 2 -A 2

Repository: qdrant/qdrant

Length of output: 1945


🏁 Script executed:

# Check the atomicwrites crate documentation/behavior via web search context
# Also check if there are Windows-specific file operation patterns in the codebase
rg "ErrorKind" lib/collection/src/shards/local_shard/ -A 1 -B 1

Repository: qdrant/qdrant

Length of output: 322


🏁 Script executed:

# Let's look at tar_ext::BuilderExt to understand blocking_append_file error behavior
fd "tar_ext" lib/collection/src/ -type f

Repository: qdrant/qdrant

Length of output: 227


🏁 Script executed:

# Check if there are any comments or issues about TOCTOU or race conditions in snapshots
rg -i "toctou|race|concurrent.*snapshot" lib/collection/src/shards/local_shard/

Repository: qdrant/qdrant

Length of output: 1711


🏁 Script executed:

# Find tar_ext implementation to understand blocking_append_file
find . -type f -name "*.rs" | xargs grep -l "fn blocking_append_file\|impl.*BuilderExt" | head -5

Repository: qdrant/qdrant

Length of output: 110


🏁 Script executed:

# Check how other parts of snapshot handle optional files
rg "blocking_append_file\|snapshot.*wal" lib/collection/src/shards/local_shard/snapshot.rs -B 2 -A 5

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

cat -n lib/common/common/src/tar_ext.rs | head -200

Repository: qdrant/qdrant

Length of output: 8066


🏁 Script executed:

# Let's check the atomicwrites crate behavior more deeply
# Also check if there are any known issues or tests around this
rg "atomicwrites" . --type rust -A 3 -B 1 | head -100

Repository: qdrant/qdrant

Length of output: 4479


🏁 Script executed:

# Check if applied_seq is truly optional by looking at snapshot restoration
rg "restore\|load.*snapshot" lib/collection/src/shards/local_shard/ -A 5 | head -80

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Look for any tests or comments about TOCTOU or file disappearing
rg -i "disappear\|missing.*file\|not.*found\|optional.*file" lib/collection/src/shards/local_shard/snapshot.rs

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Check if there's a concurrent write pattern to applied_seq during snapshot
rg "spawn_blocking.*snapshot\|snapshot.*spawn_blocking" lib/collection/src/shards/local_shard/snapshot.rs -B 5 -A 10

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Verify if applied_seq is truly optional by checking initialization and requirements
rg "file: None\|file.is_some\|file.is_none" lib/collection/src/update_workers/applied_seq.rs -B 2 -A 2

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Check how atomicwrites actually implements atomic writes on different platforms
web_search
atomicwrites rust crate atomic file operations windows delete rename

Repository: qdrant/qdrant

Length of output: 158


🌐 Web query:

atomicwrites rust crate how does it work atomic file operations

💡 Result:

The Rust crate atomicwrites makes file updates “all-or-nothing” by writing your new contents to a temporary file first, then atomically moving that temp file into place as the destination path. This prevents readers from ever observing a partially-written destination file. [1][2]

What AtomicFile::write does

  1. Creates a randomized temporary subdirectory (prefix .atomicwrite) and opens a temp file there. [2]
  2. Calls your closure |f: &mut std::fs::File| { ... } to write the complete new contents into the temp file. [2]
  3. If the closure succeeds, it atomically moves the temp file to the final path. [2]

By default (AtomicFile::new), the temp file is created under . to help ensure it’s on the same filesystem, which is required for the final move/replace to be atomic. [2][3]

Overwrite behavior (how the final “swap” works)

atomicwrites exposes two overwrite modes: [1][4]

  • AllowOverwrite: uses rename to replace the destination. [4]
  • DisallowOverwrite: uses link + unlink so the operation fails if the destination already exists (you get an AlreadyExists-style error wrapped in Error::Internal). [2][4]

Lower-level helpers

  • move_atomic(src, dst): atomic move that errors if dst exists; requires same filesystem. [5]
  • replace_atomic(src, dst): atomic move that overwrites dst if it exists; requires same filesystem. [3]

Sources: docs.rs crate docs and API pages, plus the project README. [1][2][3][4][5]


🏁 Script executed:

# Let's look at the actual snapshot code flow more carefully
cat -n lib/collection/src/shards/local_shard/snapshot.rs | sed -n '100,220p'

Repository: qdrant/qdrant

Length of output: 5271


🏁 Script executed:

# Check if applied_seq is written during snapshotting
rg "update.*applied_seq\|applied_seq.*update" lib/collection/src/shards/local_shard/ -B 2 -A 2

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Let's check if applied_seq file is truly optional by looking at snapshot restoration
rg "APPLIED_SEQ_FILE" lib/ -A 3 -B 3 | grep -A 5 -B 5 "restore\|load\|snapshot" | head -50

Repository: qdrant/qdrant

Length of output: 2373


🏁 Script executed:

# Check if there are any deletion/cleanup paths for applied_seq during normal operation
rg "remove_file.*applied_seq\|delete.*applied_seq\|unlink" lib/collection/src/ -B 2 -A 2

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Look at the context of when snapshot is called - is there concurrent shard operations?
cat -n lib/collection/src/shards/local_shard/snapshot.rs | sed -n '50,110p'

Repository: qdrant/qdrant

Length of output: 2727


🏁 Script executed:

# Check if there are any error handling patterns for this kind of TOCTOU in similar code
rg "exists.*then.*open\|TOCTOU\|race.*condition" lib/collection/src/ -i

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Let's see if applied_seq is mentioned in any issues or comments about snapshots
rg -i "applied_seq\|snapshot.*fail\|flaky" lib/collection/src/shards/local_shard/ | head -30

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Check what happens in snapshot restoration - is applied_seq optional?
rg -A 20 "fn.*restore\|fn.*load.*snapshot" lib/collection/src/shards/local_shard/snapshot.rs | head -60

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Let's look for any discussion or history of applied_seq being optional
rg "applied_seq" lib/collection/src/update_workers/applied_seq.rs | grep -i "optional\|none\|init"

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Check if there are similar patterns in code where optional files are handled
rg "if.*exists\(\).*then.*append\|if.*exists.*tar\." lib/collection/src/shards/local_shard/ -A 5

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Verify the exact behavior - is tar.blocking_append_file using std::io operations that would return NotFound?
rg "append_path_with_name\|append.*file" lib/common/common/src/tar_ext.rs -B 2 -A 3

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Let me check the tar crate's append_path_with_name to understand what error it returns
cat -n lib/common/common/src/tar_ext.rs | sed -n '170,185p'

Repository: qdrant/qdrant

Length of output: 654


🏁 Script executed:

# Let's verify if applied_seq is critical or optional by checking restoration
rg "AppliedSeqHandler\|applied_seq" lib/collection/src/shards/local_shard/mod.rs -B 2 -A 5 | head -80

Repository: qdrant/qdrant

Length of output: 39


🏁 Script executed:

# Check if there are any comments about applied_seq being optional
rg -i "optional\|critical\|required" lib/collection/src/update_workers/applied_seq.rs

Repository: qdrant/qdrant

Length of output: 39


Handle NotFound error when archiving applied_seq to avoid flaky snapshot failures.

The code has a TOCTOU race condition: exists() check can succeed, but the file may disappear between the check and blocking_append_file() call if SaveOnDisk performs an atomic rename on another thread. This is especially likely on Windows where file operations can have brief transient states. Since applied_seq is already optional (wrapped in Option), treat NotFound errors as non-fatal.

💡 Suggested fix
-        if applied_seq_path.exists() {
-            tar.blocking_append_file(applied_seq_path.as_path(), Path::new(APPLIED_SEQ_FILE))
-                .map_err(|err| {
-                    CollectionError::service_error(format!(
-                        "Error while archiving applied_seq: {err}"
-                    ))
-                })?;
-        }
+        if applied_seq_path.exists() {
+            if let Err(err) =
+                tar.blocking_append_file(applied_seq_path.as_path(), Path::new(APPLIED_SEQ_FILE))
+            {
+                if err.kind() != std::io::ErrorKind::NotFound {
+                    return Err(CollectionError::service_error(format!(
+                        "Error while archiving applied_seq: {err}"
+                    )));
+                }
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// snapshot the applied_seq file
fn snapshot_applied_seq(
applied_seq_path: PathBuf,
tar: &tar_ext::BuilderExt,
) -> CollectionResult<()> {
if applied_seq_path.exists() {
tar.blocking_append_file(applied_seq_path.as_path(), Path::new(APPLIED_SEQ_FILE))
.map_err(|err| {
CollectionError::service_error(format!(
"Error while archiving applied_seq: {err}"
))
})?;
}
/// snapshot the applied_seq file
fn snapshot_applied_seq(
applied_seq_path: PathBuf,
tar: &tar_ext::BuilderExt,
) -> CollectionResult<()> {
if applied_seq_path.exists() {
if let Err(err) =
tar.blocking_append_file(applied_seq_path.as_path(), Path::new(APPLIED_SEQ_FILE))
{
if err.kind() != std::io::ErrorKind::NotFound {
return Err(CollectionError::service_error(format!(
"Error while archiving applied_seq: {err}"
)));
}
}
}
🤖 Prompt for AI Agents
In `@lib/collection/src/shards/local_shard/snapshot.rs` around lines 188 - 200,
The TOCTOU in snapshot_applied_seq (applied_seq_path.exists() followed by
tar.blocking_append_file) can cause flaky failures when the file is
renamed/removed concurrently; change the logic to attempt blocking_append_file
unconditionally and treat an io::ErrorKind::NotFound from
tar.blocking_append_file as non-fatal (return Ok(())), while still mapping all
other errors to CollectionError::service_error; reference snapshot_applied_seq,
applied_seq_path, APPLIED_SEQ_FILE, and blocking_append_file when implementing
this conditional error handling.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do you think the applied_seq file can be deleted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# Search for any code that removes or deletes the applied_seq file
echo "=== Searching for applied_seq file removal/deletion ==="
rg "remove.*applied_seq|delete.*applied_seq|unlink.*applied_seq" --type rust -A 3 -B 3

echo -e "\n=== Checking clear_data operations ==="
rg "fn clear_data" lib/shard/src/files/mod.rs -A 30

echo -e "\n=== Checking move_data operations ==="
rg "fn move_data" lib/shard/src/files/mod.rs -A 50

echo -e "\n=== Looking for SaveOnDisk write/save behavior ==="
rg "impl.*SaveOnDisk" lib/common/common/src/save_on_disk.rs -A 50 | head -100

echo -e "\n=== Check if applied_seq file is ever explicitly removed in AppliedSeqHandler ==="
cat lib/collection/src/update_workers/applied_seq.rs | grep -A 5 -B 5 "remove\|delete\|unlink"

Length of output: 7569


@agourlay, good question! Looking at the code, the applied_seq file can be deleted in these scenarios:

  1. Shard clear operations: In clear_data() (lib/shard/src/files/mod.rs) and when clearing a local shard - the applied_seq file is explicitly removed
  2. Corruption recovery: During load_or_init() (lib/collection/src/update_workers/applied_seq.rs) - if the file fails to load, it's deleted and reinitialized

For the TOCTOU scenario I flagged to actually occur during snapshot, a shard clear operation would need to run concurrently with the snapshot creation. The sequence would be:

  1. exists() check passes inside snapshot_applied_seq()
  2. A concurrent shard clear operation removes the file
  3. blocking_append_file() tries to open the now-missing file → NotFound error

This is a narrower window than I initially implied. SaveOnDisk's atomic rename operations replace the file atomically, they don't delete it temporarily, so normal updates won't trigger this.

The real question is: Can shard clear operations run concurrently with snapshot creation? If snapshot operations have appropriate locking that prevents concurrent shard clears, then my concern doesn't apply and the current code is fine.

What's your take on whether shard clear can happen during snapshot?


🧬 Code Graph Analysis Results

lib/shard/src/files/mod.rs

  • Snippet: applied_seq_path
    • Lines:
      • 23-25
    pub fn applied_seq_path(shard_path: &Path) -> PathBuf {
        shard_path.join(APPLIED_SEQ_FILE)
    }
    

lib/collection/src/shards/local_shard/snapshot.rs

  • Snippet: snapshot_applied_seq
    • Lines:
      • (function definition and body as provided)
    /// snapshot the applied_seq file
    fn snapshot_applied_seq(
        applied_seq_path: PathBuf,
        tar: &tar_ext::BuilderExt,
    ) -> CollectionResult<()> {
        if applied_seq_path.exists() {
            tar.blocking_append_file(applied_seq_path.as_path(), Path::new(APPLIED_SEQ_FILE))
                .map_err(|err| {
                    CollectionError::service_error(format!(
                        "Error while archiving applied_seq: {err}"
                    ))
                })?;
        }
        Ok(())
    }
    

lib/collection/src/shards/local_shard/snapshot.rs

  • Snippet: in snapshot creation, how applied_seq is prepared
    • Lines:
      • (definitions around applied_seq_path and usage inside the thread)
    let applied_seq_path = self.applied_seq_handler.path().to_path_buf();
    
    let handle = tokio::task::spawn_blocking(move || {
        // Do not change segments while snapshotting
        snapshot_all_segments(
            segments.clone(),
            &segments_path,
            Some(segment_config),
            payload_index_schema.clone(),
            &temp_path,
            &tar_c.descend(Path::new(SEGMENTS_PATH))?,
            format,
            manifest.as_ref(),
            update_lock,
        )?;
    
        if save_wal {
            // snapshot all shard's WAL
            Self::snapshot_wal(wal, &tar_c)?;
            // snapshot applied_seq
            Self::snapshot_applied_seq(applied_seq_path, &tar_c)
        } else {
            Self::snapshot_empty_wal(wal, &temp_path, &tar_c)
        }
    });
    

🧠 Learnings used
Learnt from: timvisee
Repo: qdrant/qdrant PR: 7157
File: lib/shard/src/segment_holder/mod.rs:808-814
Timestamp: 2025-09-01T11:42:06.964Z
Learning: In Qdrant's segment holder, panicking when no segments exist during flush_all is intentional and preferred over graceful error handling, as having zero segments could permanently corrupt the WAL by acknowledging u64::MAX. The maintainers consider this condition impossible and use the panic as a fail-fast safety mechanism to prevent data corruption.

Learnt from: generall
Repo: qdrant/qdrant PR: 7388
File: lib/shard/src/segment_holder/flush.rs:167-191
Timestamp: 2025-10-12T22:22:11.435Z
Learning: In Qdrant's SegmentHolder flush logic, when calculating the minimum unsaved version for WAL acknowledgment, the code intentionally returns `segment_persisted_version` (not `segment_persisted_version + 1`) when there are unsaved changes. This conservative approach assumes the last persisted version might not have been fully applied and allows the system to safely retry operations when versions match, ensuring WAL consistency.

Learnt from: timvisee
Repo: qdrant/qdrant PR: 7388
File: lib/shard/src/segment_holder/flush.rs:73-76
Timestamp: 2025-10-13T09:34:22.740Z
Learning: In Qdrant's SegmentHolder flush logic, when `sync=false` and a background flush is already running, returning early is acceptable because: (1) flushes normally only start if there are changes, (2) the lowest version number is acknowledged in the WAL when changes exist, and (3) missed operations will replay on restart. However, an edge case exists when the `force` flag is used: if the first forced flush has no changes but a follow-up non-force flush with changes is ignored, the system might acknowledge a version that is too recent.

@agourlay agourlay added this to the Update queue milestone Jan 28, 2026
Copy link
Member

@timvisee timvisee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Looks good 🙌

Sorry for all the back and forth. Happy with the state it's in now.

@agourlay agourlay merged commit 5e986e1 into dev Jan 28, 2026
15 checks passed
@agourlay agourlay deleted the save-applied-seq branch January 28, 2026 10:11
generall pushed a commit that referenced this pull request Feb 9, 2026
* Persist applied seq at regular interval

* Error handling

* fix brain fart

* don't reset update_count

* handles explicitly failure to load

* init from WAL instead of zero

* fixes

* try to regenerate file on corruption

* fix

* trust the WAL if higher watermark most likely from a transfer

* use last WAL index & handles file shard lifecycle

* last WAL index is inclusive

* include applied_seq in partial snapshot

* snapshot applied_seq file

* fix

* remove outdated comment

* Describe AppliedSeq structure, it is used for persisting

---------

Co-authored-by: timvisee <tim@visee.me>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants