Skip to content

Fix deadlock during streaming shard snapshot#7241

Merged
timvisee merged 2 commits intodevfrom
fix-streaming-shard-snapshot-deadlock
Sep 11, 2025
Merged

Fix deadlock during streaming shard snapshot#7241
timvisee merged 2 commits intodevfrom
fix-streaming-shard-snapshot-deadlock

Conversation

@ffuugoo
Copy link
Contributor

@ffuugoo ffuugoo commented Sep 10, 2025

This PR:

  • refactors telemetry methods on LocalShard to be async and use spawn_blocking, so that telemetry API won't block Actix runtime with a blocking read-lock call
  • adds a test for this specific deadlock we've seen in chaos-testing

All Submissions:

  • Contributions should target the dev branch. Did you create your branch from dev?
  • Have you followed the guidelines in our Contributing document?
  • Have you checked to ensure there aren't other open Pull Requests for the same update/change?

New Feature Submissions:

  1. Does your submission pass tests?
  2. Have you formatted your code locally using cargo +nightly fmt --all command prior to submission?
  3. Have you checked your code using cargo clippy --all --all-features command?

Changes to Core Features:

  • Have you added an explanation of what your changes do and why you'd like us to include them?
  • Have you written new tests for your core changes, as applicable?
  • Have you successfully ran tests with your changes locally?

@ffuugoo ffuugoo marked this pull request as ready for review September 11, 2025 09:37
@ffuugoo ffuugoo requested a review from timvisee September 11, 2025 09:37
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (11)
tests/consensus_tests/test_shard_snapshot_deadlock.py (2)

36-36: Drop unused variable bindings to silence F841 and reduce noise.

-    upsert = executor.submit(upsert_random_points, peer_url, 10)
+    executor.submit(upsert_random_points, peer_url, 10)
-    telemetry = executor.submit(requests.get, f"{peer_url}/telemetry", params = { "details_level": "1" })
+    executor.submit(requests.get, f"{peer_url}/telemetry", params={"details_level": "1"}, timeout=5)

Also applies to: 40-40


43-45: Slightly relax the root GET timeout to reduce flakiness on slower CI runners.
0.5s can be tight; 1s still catches deadlocks while avoiding sporadic failures.

-    resp = requests.get(f"{peer_url}/", timeout = 0.5)
+    resp = requests.get(f"{peer_url}/", timeout=1.0)
lib/collection/src/shards/replica_set/telemetry.rs (2)

1-1: Avoid importing Deref as _; prefer idiomatic as_ref()/pattern matching.
The extra import adds cognitive load without benefit here.

-use std::ops::Deref as _;
+// (no need for Deref import)

45-60: (Optional) Apply the same lock-release pattern to get_telemetry_data above for consistency.
Not part of this hunk, but the same risk exists there.

lib/collection/src/shards/shard.rs (1)

76-86: Telemetry.status is set only for Local shards; proxies return None.

Calls via Shard::Proxy/ForwardProxy/QueueProxy return telemetry without status, unlike the Local path. Either set status for all variants here, or set it inside each proxy’s get_telemetry_data to keep behavior consistent.

Would you like a patch to set status in proxy and queue_proxy shards’ get_telemetry_data?

lib/collection/src/shards/proxy_shard.rs (1)

148-150: Also populate telemetry.status here to avoid None when called directly.

Set status from the wrapped local shard so direct ProxyShard calls produce complete telemetry (parity with Shard::Local path).

Apply:

 pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry {
-    self.wrapped_shard.get_telemetry_data(detail).await
+    let mut t = self.wrapped_shard.get_telemetry_data(detail).await;
+    t.status = Some(self.wrapped_shard.local_shard_status().await.0);
+    t
 }
lib/collection/src/shards/queue_proxy_shard.rs (1)

190-195: Mirror Local path: populate telemetry.status for queue proxy too.

Avoid returning telemetry without status when using QueueProxyShard directly.

Apply:

 pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry {
-    self.inner_unchecked()
-        .wrapped_shard
-        .get_telemetry_data(detail)
-        .await
+    let local = &self.inner_unchecked().wrapped_shard;
+    let mut t = local.get_telemetry_data(detail).await;
+    t.status = Some(local.local_shard_status().await.0);
+    t
 }
lib/collection/src/shards/local_shard/telemetry.rs (4)

15-27: Avoid taking the read-lock and spawning when detail < Level4.

You still acquire a blocking read-lock (and spawn) even when Level < 4. Short-circuit to skip both the spawn and the lock; also handle JoinError inline to avoid the extra unwrap/log block.

Apply:

-        let segments = self.segments.clone();
-
-        let segments = tokio::task::spawn_blocking(move || {
-            let segments = segments.read();
-
-            if detail.level >= DetailsLevel::Level4 {
-                segments
-                    .iter()
-                    .map(|(_id, segment)| segment.get().read().get_telemetry_data(detail))
-                    .collect()
-            } else {
-                vec![]
-            }
-        })
-        .await;
-
-        if let Err(err) = &segments {
-            log::error!("Failed to get telemetry: {err}");
-        }
-
-        let segments = segments.unwrap_or_default();
+        let segments = if detail.level >= DetailsLevel::Level4 {
+            let segments = self.segments.clone();
+            match tokio::task::spawn_blocking(move || {
+                let segments = segments.read();
+                segments
+                    .iter()
+                    .map(|(_id, segment)| segment.get().read().get_telemetry_data(detail))
+                    .collect::<Vec<_>>()
+            })
+            .await
+            {
+                Ok(v) => v,
+                Err(err) => {
+                    log::error!("failed to get telemetry: {err}");
+                    Vec::new()
+                }
+            }
+        } else {
+            Vec::new()
+        };

Also applies to: 29-34


48-56: Run status and size-stats concurrently to cut tail latency.

Both are independent blocking tasks; join them.

-        let status = self.get_optimization_status().await;
-
-        let SizeStats {
-            num_vectors,
-            vectors_size_bytes,
-            payloads_size_bytes,
-            num_points,
-        } = self.get_size_stats().await;
+        let (status, size_stats) =
+            tokio::join!(self.get_optimization_status(), self.get_size_stats());
+        let SizeStats {
+            num_vectors,
+            vectors_size_bytes,
+            payloads_size_bytes,
+            num_points,
+        } = size_stats;

80-97: Minor: prefer Arc::clone and unify error message casing.

Tiny readability/style nits: favor Arc::clone(&self.segments) and keep error-case consistent with other logs (“failed …”).

-        let segments = self.segments.clone();
+        let segments = std::sync::Arc::clone(&self.segments);
@@
-            Err(err) => OptimizersStatus::Error(format!("failed to get optimizers status: {err}")),
+            Err(err) => OptimizersStatus::Error(format!("failed to get optimizers status: {err}")),

Add once at top (outside this hunk) if you prefer the shorter form:

use std::sync::Arc;

99-134: Unify error handling style (match on await result).

Mirror the pattern used above to keep logging and fallback localized.

-        let stats = tokio::task::spawn_blocking(move || {
+        let stats = tokio::task::spawn_blocking(move || {
             // ...
-        })
-        .await;
-
-        if let Err(err) = &stats {
-            log::error!("failed to get size stats: {err}");
-        }
-
-        stats.unwrap_or_default()
+        })
+        .await
+        .map_err(|err| {
+            log::error!("failed to get size stats: {err}");
+            err
+        })
+        .unwrap_or_default()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b29e59a and fabf579.

📒 Files selected for processing (7)
  • lib/collection/src/shards/forward_proxy_shard.rs (1 hunks)
  • lib/collection/src/shards/local_shard/telemetry.rs (4 hunks)
  • lib/collection/src/shards/proxy_shard.rs (1 hunks)
  • lib/collection/src/shards/queue_proxy_shard.rs (1 hunks)
  • lib/collection/src/shards/replica_set/telemetry.rs (2 hunks)
  • lib/collection/src/shards/shard.rs (1 hunks)
  • tests/consensus_tests/test_shard_snapshot_deadlock.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs

📄 CodeRabbit inference engine (.github/review-rules.md)

**/*.rs: Prefer explicit SomeType::from(x) over implicit x.into() in Rust code
Do not use transmute_from_u8, transmute_to_u8, transmute_from_u8_to_slice, transmute_from_u8_to_mut_slice, transmute_to_u8_slice in new code; use bytemuck or zerocopy instead

Files:

  • lib/collection/src/shards/shard.rs
  • lib/collection/src/shards/forward_proxy_shard.rs
  • lib/collection/src/shards/proxy_shard.rs
  • lib/collection/src/shards/local_shard/telemetry.rs
  • lib/collection/src/shards/queue_proxy_shard.rs
  • lib/collection/src/shards/replica_set/telemetry.rs
**/src/**/*.rs

📄 CodeRabbit inference engine (.github/review-rules.md)

**/src/**/*.rs: Prefer exhaustive match arms over a catch-all _ arm to avoid missing new enum variants (except in tests/benchmarks or when provably safe)
Prefer explicit field ignoring with : _ over .. in struct patterns (except in tests/benchmarks or when provably safe)

Files:

  • lib/collection/src/shards/shard.rs
  • lib/collection/src/shards/forward_proxy_shard.rs
  • lib/collection/src/shards/proxy_shard.rs
  • lib/collection/src/shards/local_shard/telemetry.rs
  • lib/collection/src/shards/queue_proxy_shard.rs
  • lib/collection/src/shards/replica_set/telemetry.rs
🧬 Code graph analysis (7)
lib/collection/src/shards/shard.rs (7)
src/actix/api/service_api.rs (1)
  • telemetry (36-59)
lib/collection/src/shards/forward_proxy_shard.rs (2)
  • get_optimization_status (349-351)
  • get_size_stats (353-355)
lib/collection/src/shards/local_shard/telemetry.rs (2)
  • get_optimization_status (80-97)
  • get_size_stats (99-134)
lib/collection/src/shards/proxy_shard.rs (2)
  • get_optimization_status (152-154)
  • get_size_stats (156-158)
lib/collection/src/shards/queue_proxy_shard.rs (2)
  • get_optimization_status (197-202)
  • get_size_stats (204-206)
lib/collection/src/shards/replica_set/telemetry.rs (2)
  • get_optimization_status (42-50)
  • get_size_stats (52-60)
lib/collection/src/shards/dummy_shard.rs (2)
  • get_optimization_status (76-78)
  • get_size_stats (80-82)
lib/collection/src/shards/forward_proxy_shard.rs (4)
lib/collection/src/shards/local_shard/telemetry.rs (3)
  • get_telemetry_data (12-78)
  • get_optimization_status (80-97)
  • get_size_stats (99-134)
lib/collection/src/shards/proxy_shard.rs (3)
  • get_telemetry_data (148-150)
  • get_optimization_status (152-154)
  • get_size_stats (156-158)
lib/collection/src/shards/queue_proxy_shard.rs (3)
  • get_telemetry_data (190-195)
  • get_optimization_status (197-202)
  • get_size_stats (204-206)
lib/collection/src/shards/shard.rs (3)
  • get_telemetry_data (73-90)
  • get_optimization_status (92-102)
  • get_size_stats (104-112)
lib/collection/src/shards/proxy_shard.rs (4)
lib/collection/src/shards/forward_proxy_shard.rs (3)
  • get_telemetry_data (345-347)
  • get_optimization_status (349-351)
  • get_size_stats (353-355)
lib/collection/src/shards/local_shard/telemetry.rs (3)
  • get_telemetry_data (12-78)
  • get_optimization_status (80-97)
  • get_size_stats (99-134)
lib/collection/src/shards/queue_proxy_shard.rs (3)
  • get_telemetry_data (190-195)
  • get_optimization_status (197-202)
  • get_size_stats (204-206)
lib/collection/src/shards/shard.rs (3)
  • get_telemetry_data (73-90)
  • get_optimization_status (92-102)
  • get_size_stats (104-112)
lib/collection/src/shards/local_shard/telemetry.rs (7)
lib/collection/src/shards/forward_proxy_shard.rs (4)
  • get_telemetry_data (345-347)
  • get_optimization_status (349-351)
  • get_size_stats (353-355)
  • info (503-506)
lib/collection/src/shards/proxy_shard.rs (4)
  • get_telemetry_data (148-150)
  • get_optimization_status (152-154)
  • get_size_stats (156-158)
  • info (265-268)
lib/collection/src/shards/queue_proxy_shard.rs (5)
  • get_telemetry_data (190-195)
  • get_optimization_status (197-202)
  • get_size_stats (204-206)
  • info (300-302)
  • info (624-627)
lib/collection/src/shards/replica_set/telemetry.rs (3)
  • get_telemetry_data (11-40)
  • get_optimization_status (42-50)
  • get_size_stats (52-60)
lib/collection/src/shards/shard.rs (3)
  • get_telemetry_data (73-90)
  • get_optimization_status (92-102)
  • get_size_stats (104-112)
lib/collection/src/collection/mod.rs (1)
  • get_telemetry_data (782-815)
lib/collection/src/shards/local_shard/mod.rs (1)
  • segments (252-254)
lib/collection/src/shards/queue_proxy_shard.rs (3)
lib/collection/src/shards/forward_proxy_shard.rs (3)
  • get_telemetry_data (345-347)
  • get_optimization_status (349-351)
  • get_size_stats (353-355)
lib/collection/src/shards/proxy_shard.rs (3)
  • get_telemetry_data (148-150)
  • get_optimization_status (152-154)
  • get_size_stats (156-158)
lib/collection/src/shards/shard.rs (3)
  • get_telemetry_data (73-90)
  • get_optimization_status (92-102)
  • get_size_stats (104-112)
lib/collection/src/shards/replica_set/telemetry.rs (6)
lib/collection/src/shards/forward_proxy_shard.rs (1)
  • get_size_stats (353-355)
lib/collection/src/shards/local_shard/telemetry.rs (1)
  • get_size_stats (99-134)
lib/collection/src/shards/proxy_shard.rs (1)
  • get_size_stats (156-158)
lib/collection/src/shards/queue_proxy_shard.rs (1)
  • get_size_stats (204-206)
lib/collection/src/shards/shard.rs (1)
  • get_size_stats (104-112)
lib/collection/src/shards/dummy_shard.rs (1)
  • get_size_stats (80-82)
tests/consensus_tests/test_shard_snapshot_deadlock.py (5)
src/actix/helpers.rs (1)
  • time (141-147)
tests/consensus_tests/utils.py (2)
  • assert_project_root (110-113)
  • start_cluster (220-245)
tests/consensus_tests/fixtures.py (1)
  • upsert_random_points (83-129)
src/actix/api/service_api.rs (1)
  • telemetry (36-59)
tests/consensus_tests/assertions.py (1)
  • assert_http_ok (6-13)
🪛 Ruff (0.12.2)
tests/consensus_tests/test_shard_snapshot_deadlock.py

6-6: from .assertions import * used; unable to detect undefined names

(F403)


7-7: from .fixtures import * used; unable to detect undefined names

(F403)


8-8: from .utils import * used; unable to detect undefined names

(F403)


12-12: assert_project_root may be undefined, or defined from star imports

(F405)


15-15: start_cluster may be undefined, or defined from star imports

(F405)


19-19: create_collection may be undefined, or defined from star imports

(F405)


22-22: Probable use of requests call without timeout

(S113)


30-30: Local variable snapshot_chunk is assigned to but never used

Remove assignment to unused variable snapshot_chunk

(F841)


36-36: Local variable upsert is assigned to but never used

Remove assignment to unused variable upsert

(F841)


36-36: upsert_random_points may be undefined, or defined from star imports

(F405)


40-40: Local variable telemetry is assigned to but never used

Remove assignment to unused variable telemetry

(F841)


45-45: assert_http_ok may be undefined, or defined from star imports

(F405)

🔇 Additional comments (10)
lib/collection/src/shards/replica_set/telemetry.rs (1)

45-60: Verify LocalShard is Clone or stored behind Arc

  • lib/collection/src/shards/local_shard/mod.rs:89 defines LocalShard; repository search shows no derive/impl Clone or Option<Arc>.
  • Action: either derive/impl Clone for LocalShard, or store it as Arc (e.g., change self.local to Option<Arc>) and clone the Arc at call sites.
lib/collection/src/shards/forward_proxy_shard.rs (1)

345-355: LGTM: async forwarding avoids blocking and aligns with the new async telemetry API.
Straightforward awaits on the wrapped shard; consistent with analogous methods elsewhere.

lib/collection/src/shards/shard.rs (2)

92-101: Async delegation looks good.

Async signature + awaited delegation across variants is correct.


104-111: Async delegation looks good.

Consistent awaited forwarding for size stats.

lib/collection/src/shards/proxy_shard.rs (2)

152-154: Async delegation LGTM.


156-158: Async delegation LGTM.

lib/collection/src/shards/queue_proxy_shard.rs (2)

197-202: Async delegation LGTM.


204-205: Async delegation LGTM.

lib/collection/src/shards/local_shard/telemetry.rs (2)

12-78: Good async offload; fixes the deadlock vector.

Moving sync RwLock reads into spawn_blocking and awaiting from async callers is the right call for Actix/Tokio. The construction of LocalShardTelemetry looks consistent with Shard::get_telemetry_data post-processing.


12-78: Sanity-check async telemetry call sites — ensure .await is used

Sandbox search returned no matches (lib/collection not present in this checkout). Run locally to find un-awaited calls:

rg -nP --hidden -S '\bget_(telemetry_data|get_optimization_status|get_size_stats)\s*(' -g '!/target/' -g '!/node_modules/' -g '!/.git/' | grep -v '.await'

File to check: lib/collection/src/shards/local_shard/telemetry.rs — verify ranges 12-78, 80-97, 99-134.

Comment on lines +45 to 50
let Some(local) = local_shard.deref() else {
return None;
};

Some(local.get_optimization_status().await)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Do not hold the self.local read-lock across .await; clone handle and drop the guard first.
Holding a tokio::RwLock read guard across an await can block writers (e.g., during reconfiguration), risking latency spikes or deadlocks. Clone/extract the shard, release the lock, then await.

-        let Some(local) = local_shard.deref() else {
-            return None;
-        };
-
-        Some(local.get_optimization_status().await)
+        // Extract local shard, drop the lock, then await.
+        let local = local_shard.clone();
+        drop(local_shard);
+        let Some(local) = local else { return None; };
+        Some(local.get_optimization_status().await)

If LocalShard isn’t Clone, consider storing it behind Arc (e.g., Option<Arc<LocalShard>>) and cloning the Arc here.

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +55 to 60
let Some(local) = local_shard.deref() else {
return SizeStats::default();
};

local.get_size_stats().await
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Same: release self.local read-lock before awaiting get_size_stats().

-        let Some(local) = local_shard.deref() else {
-            return SizeStats::default();
-        };
-
-        local.get_size_stats().await
+        let local = local_shard.clone();
+        drop(local_shard);
+        match local {
+            Some(local) => local.get_size_stats().await,
+            None => SizeStats::default(),
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let Some(local) = local_shard.deref() else {
return SizeStats::default();
};
local.get_size_stats().await
}
let local = local_shard.clone();
drop(local_shard);
match local {
Some(local) => local.get_size_stats().await,
None => SizeStats::default(),
}
}

Comment on lines +6 to +8
from .assertions import *
from .fixtures import *
from .utils import *
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace star imports with explicit imports to satisfy Ruff and avoid namespace ambiguity.
Prevents F403/F405 and clarifies dependencies.

-from .assertions import *
-from .fixtures import *
-from .utils import *
+from .assertions import assert_http_ok
+from .fixtures import create_collection, upsert_random_points
+from .utils import assert_project_root, start_cluster

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.12.2)

6-6: from .assertions import * used; unable to detect undefined names

(F403)


7-7: from .fixtures import * used; unable to detect undefined names

(F403)


8-8: from .utils import * used; unable to detect undefined names

(F403)

🤖 Prompt for AI Agents
In tests/consensus_tests/test_shard_snapshot_deadlock.py around lines 6 to 8,
the file currently uses three wildcard imports (from .assertions import *, from
.fixtures import *, from .utils import *); replace these star imports with
explicit named imports from each module: inspect the test file to identify
exactly which functions/classes/constants are referenced from .assertions,
.fixtures and .utils, then change each line to import only those symbols (e.g.
from .assertions import assert_x, assert_y), run ruff/pytest to ensure no
F403/F405 errors remain, and remove any unused imports found.

create_collection(peer_url)

# Request streaming shard snapshot
snapshot = requests.get(f"{peer_url}/collections/test_collection/shards/0/snapshot", stream = True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add a timeout to the snapshot request to avoid indefinite hangs in CI.
Even though the stream is intentionally left open, a finite timeout guards against stalls before the first chunk.

-    snapshot = requests.get(f"{peer_url}/collections/test_collection/shards/0/snapshot", stream = True)
+    snapshot = requests.get(
+        f"{peer_url}/collections/test_collection/shards/0/snapshot",
+        stream=True,
+        timeout=5,
+    )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
snapshot = requests.get(f"{peer_url}/collections/test_collection/shards/0/snapshot", stream = True)
snapshot = requests.get(
f"{peer_url}/collections/test_collection/shards/0/snapshot",
stream=True,
timeout=5,
)
🧰 Tools
🪛 Ruff (0.12.2)

22-22: Probable use of requests call without timeout

(S113)

🤖 Prompt for AI Agents
In tests/consensus_tests/test_shard_snapshot_deadlock.py around line 22, the
requests.get call for the shard snapshot uses stream=True but lacks a timeout
which can cause indefinite hangs in CI; add a timeout parameter (e.g.,
timeout=(5, 30) or a single timeout like timeout=10) to the requests.get
invocation so the call will fail fast if the connection or first read stalls
while preserving stream=True behavior.

Comment on lines +27 to +31
# Read only the first response chunk, but don't read response to completion. This ensures
# that snapshot processing started, but makes request "hang" while holding segment read-lock.
snapshot_stream = snapshot.iter_content(None)
snapshot_chunk = next(snapshot_stream)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Read a minimal chunk and assert progress; current None chunk size may block too long.
Using a 1-byte chunk yields the lock quickly and reduces flakiness; also assert non-empty chunk.

-    snapshot_stream = snapshot.iter_content(None)
-    snapshot_chunk = next(snapshot_stream)
+    snapshot_stream = snapshot.iter_content(chunk_size=1)
+    first_chunk = next(snapshot_stream)
+    assert first_chunk, "Snapshot stream produced an empty first chunk"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Read only the first response chunk, but don't read response to completion. This ensures
# that snapshot processing started, but makes request "hang" while holding segment read-lock.
snapshot_stream = snapshot.iter_content(None)
snapshot_chunk = next(snapshot_stream)
# Read only the first response chunk, but don't read response to completion. This ensures
# that snapshot processing started, but makes request "hang" while holding segment read-lock.
snapshot_stream = snapshot.iter_content(chunk_size=1)
first_chunk = next(snapshot_stream)
assert first_chunk, "Snapshot stream produced an empty first chunk"
🧰 Tools
🪛 Ruff (0.12.2)

30-30: Local variable snapshot_chunk is assigned to but never used

Remove assignment to unused variable snapshot_chunk

(F841)

🤖 Prompt for AI Agents
In tests/consensus_tests/test_shard_snapshot_deadlock.py around lines 27 to 31,
the test currently calls snapshot.iter_content(None) which can block too long;
change to snapshot.iter_content(1) to read a 1-byte chunk so the segment
read-lock is acquired quickly, then assert the returned snapshot_chunk is
non-empty (e.g., assert snapshot_chunk) to ensure progress was made.

Comment on lines +32 to +41
# Run background executor to send blocking requests without blocking the test
executor = ThreadPoolExecutor(max_workers = 2)

# Upsert a point, to block on segment write-lock
upsert = executor.submit(upsert_random_points, peer_url, 10)
time.sleep(0.1)

# Get telemetry, to block on segment read-lock, which would block Actix worker
telemetry = executor.submit(requests.get, f"{peer_url}/telemetry", params = { "details_level": "1" })
time.sleep(0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure resources are cleaned up: auto-shutdown the executor and close the snapshot stream.
Prevents thread leakage and long-lived open HTTP connections across test runs.

-    # Run background executor to send blocking requests without blocking the test
-    executor = ThreadPoolExecutor(max_workers = 2)
-
-    # Upsert a point, to block on segment write-lock
-    upsert = executor.submit(upsert_random_points, peer_url, 10)
-    time.sleep(0.1)
-
-    # Get telemetry, to block on segment read-lock, which would block Actix worker
-    telemetry = executor.submit(requests.get, f"{peer_url}/telemetry", params = { "details_level": "1" })
-    time.sleep(0.1)
-
-    # Try to query Qdrant version info, which would block and timeout if Actix worker is blocked
-    resp = requests.get(f"{peer_url}/", timeout = 0.5)
-    assert_http_ok(resp)
+    # Run background executor to send blocking requests without blocking the test
+    try:
+        with ThreadPoolExecutor(max_workers=2) as executor:
+            # Upsert a point, to block on segment write-lock
+            executor.submit(upsert_random_points, peer_url, 10)
+            time.sleep(0.1)
+
+            # Get telemetry, to block on segment read-lock, which would block Actix worker
+            executor.submit(
+                requests.get,
+                f"{peer_url}/telemetry",
+                None,
+                {"details_level": "1"},
+                5,  # timeout
+            )
+            time.sleep(0.1)
+
+            # Try to query Qdrant version info, which would block and timeout if Actix worker is blocked
+            resp = requests.get(f"{peer_url}/", timeout=1.0)
+            assert_http_ok(resp)
+    finally:
+        snapshot.close()

Also applies to: 45-45

🧰 Tools
🪛 Ruff (0.12.2)

36-36: Local variable upsert is assigned to but never used

Remove assignment to unused variable upsert

(F841)


36-36: upsert_random_points may be undefined, or defined from star imports

(F405)


40-40: Local variable telemetry is assigned to but never used

Remove assignment to unused variable telemetry

(F841)

🤖 Prompt for AI Agents
In tests/consensus_tests/test_shard_snapshot_deadlock.py around lines 32-41 (and
also apply same fix at line 45), the ThreadPoolExecutor and the HTTP
snapshot/telemetry response are left open causing thread leakage and lingering
connections; ensure the executor is shut down and any requests response/stream
is closed: capture the Future/response objects, call
executor.shutdown(wait=True) after the background tasks complete (or use a
context manager) and explicitly close response objects (response.close() or use
with requests.get(...) as resp) for the snapshot/telemetry calls so all threads
and HTTP connections are cleaned up.

@timvisee timvisee requested a review from tellet-q September 11, 2025 11:26
@qdrant qdrant deleted a comment from coderabbitai bot Sep 11, 2025
Copy link
Member

@timvisee timvisee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Comment on lines -57 to +69
segments: (!segments.is_empty()).then_some(segments),
segments: if segments.is_empty() {
None
} else {
Some(segments)
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥲

@timvisee timvisee merged commit a3ce716 into dev Sep 11, 2025
16 checks passed
@timvisee timvisee deleted the fix-streaming-shard-snapshot-deadlock branch September 11, 2025 12:55
timvisee pushed a commit that referenced this pull request Sep 29, 2025
* Add `test_shard_snapshot_deadlock` integration test

* Fix deadlock during streaming shard snapshot
@timvisee timvisee mentioned this pull request Sep 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants