Fix deadlock during streaming shard snapshot#7241
Conversation
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (11)
tests/consensus_tests/test_shard_snapshot_deadlock.py (2)
36-36: Drop unused variable bindings to silence F841 and reduce noise.- upsert = executor.submit(upsert_random_points, peer_url, 10) + executor.submit(upsert_random_points, peer_url, 10)- telemetry = executor.submit(requests.get, f"{peer_url}/telemetry", params = { "details_level": "1" }) + executor.submit(requests.get, f"{peer_url}/telemetry", params={"details_level": "1"}, timeout=5)Also applies to: 40-40
43-45: Slightly relax the root GET timeout to reduce flakiness on slower CI runners.
0.5s can be tight; 1s still catches deadlocks while avoiding sporadic failures.- resp = requests.get(f"{peer_url}/", timeout = 0.5) + resp = requests.get(f"{peer_url}/", timeout=1.0)lib/collection/src/shards/replica_set/telemetry.rs (2)
1-1: Avoid importingDeref as _; prefer idiomaticas_ref()/pattern matching.
The extra import adds cognitive load without benefit here.-use std::ops::Deref as _; +// (no need for Deref import)
45-60: (Optional) Apply the same lock-release pattern toget_telemetry_dataabove for consistency.
Not part of this hunk, but the same risk exists there.lib/collection/src/shards/shard.rs (1)
76-86: Telemetry.status is set only for Local shards; proxies return None.Calls via Shard::Proxy/ForwardProxy/QueueProxy return telemetry without status, unlike the Local path. Either set status for all variants here, or set it inside each proxy’s get_telemetry_data to keep behavior consistent.
Would you like a patch to set status in proxy and queue_proxy shards’ get_telemetry_data?
lib/collection/src/shards/proxy_shard.rs (1)
148-150: Also populate telemetry.status here to avoid None when called directly.Set status from the wrapped local shard so direct ProxyShard calls produce complete telemetry (parity with Shard::Local path).
Apply:
pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry { - self.wrapped_shard.get_telemetry_data(detail).await + let mut t = self.wrapped_shard.get_telemetry_data(detail).await; + t.status = Some(self.wrapped_shard.local_shard_status().await.0); + t }lib/collection/src/shards/queue_proxy_shard.rs (1)
190-195: Mirror Local path: populate telemetry.status for queue proxy too.Avoid returning telemetry without status when using QueueProxyShard directly.
Apply:
pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry { - self.inner_unchecked() - .wrapped_shard - .get_telemetry_data(detail) - .await + let local = &self.inner_unchecked().wrapped_shard; + let mut t = local.get_telemetry_data(detail).await; + t.status = Some(local.local_shard_status().await.0); + t }lib/collection/src/shards/local_shard/telemetry.rs (4)
15-27: Avoid taking the read-lock and spawning when detail < Level4.You still acquire a blocking read-lock (and spawn) even when Level < 4. Short-circuit to skip both the spawn and the lock; also handle JoinError inline to avoid the extra unwrap/log block.
Apply:
- let segments = self.segments.clone(); - - let segments = tokio::task::spawn_blocking(move || { - let segments = segments.read(); - - if detail.level >= DetailsLevel::Level4 { - segments - .iter() - .map(|(_id, segment)| segment.get().read().get_telemetry_data(detail)) - .collect() - } else { - vec![] - } - }) - .await; - - if let Err(err) = &segments { - log::error!("Failed to get telemetry: {err}"); - } - - let segments = segments.unwrap_or_default(); + let segments = if detail.level >= DetailsLevel::Level4 { + let segments = self.segments.clone(); + match tokio::task::spawn_blocking(move || { + let segments = segments.read(); + segments + .iter() + .map(|(_id, segment)| segment.get().read().get_telemetry_data(detail)) + .collect::<Vec<_>>() + }) + .await + { + Ok(v) => v, + Err(err) => { + log::error!("failed to get telemetry: {err}"); + Vec::new() + } + } + } else { + Vec::new() + };Also applies to: 29-34
48-56: Run status and size-stats concurrently to cut tail latency.Both are independent blocking tasks; join them.
- let status = self.get_optimization_status().await; - - let SizeStats { - num_vectors, - vectors_size_bytes, - payloads_size_bytes, - num_points, - } = self.get_size_stats().await; + let (status, size_stats) = + tokio::join!(self.get_optimization_status(), self.get_size_stats()); + let SizeStats { + num_vectors, + vectors_size_bytes, + payloads_size_bytes, + num_points, + } = size_stats;
80-97: Minor: prefer Arc::clone and unify error message casing.Tiny readability/style nits: favor Arc::clone(&self.segments) and keep error-case consistent with other logs (“failed …”).
- let segments = self.segments.clone(); + let segments = std::sync::Arc::clone(&self.segments); @@ - Err(err) => OptimizersStatus::Error(format!("failed to get optimizers status: {err}")), + Err(err) => OptimizersStatus::Error(format!("failed to get optimizers status: {err}")),Add once at top (outside this hunk) if you prefer the shorter form:
use std::sync::Arc;
99-134: Unify error handling style (match on await result).Mirror the pattern used above to keep logging and fallback localized.
- let stats = tokio::task::spawn_blocking(move || { + let stats = tokio::task::spawn_blocking(move || { // ... - }) - .await; - - if let Err(err) = &stats { - log::error!("failed to get size stats: {err}"); - } - - stats.unwrap_or_default() + }) + .await + .map_err(|err| { + log::error!("failed to get size stats: {err}"); + err + }) + .unwrap_or_default()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
lib/collection/src/shards/forward_proxy_shard.rs(1 hunks)lib/collection/src/shards/local_shard/telemetry.rs(4 hunks)lib/collection/src/shards/proxy_shard.rs(1 hunks)lib/collection/src/shards/queue_proxy_shard.rs(1 hunks)lib/collection/src/shards/replica_set/telemetry.rs(2 hunks)lib/collection/src/shards/shard.rs(1 hunks)tests/consensus_tests/test_shard_snapshot_deadlock.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs
📄 CodeRabbit inference engine (.github/review-rules.md)
**/*.rs: Prefer explicit SomeType::from(x) over implicit x.into() in Rust code
Do not use transmute_from_u8, transmute_to_u8, transmute_from_u8_to_slice, transmute_from_u8_to_mut_slice, transmute_to_u8_slice in new code; use bytemuck or zerocopy instead
Files:
lib/collection/src/shards/shard.rslib/collection/src/shards/forward_proxy_shard.rslib/collection/src/shards/proxy_shard.rslib/collection/src/shards/local_shard/telemetry.rslib/collection/src/shards/queue_proxy_shard.rslib/collection/src/shards/replica_set/telemetry.rs
**/src/**/*.rs
📄 CodeRabbit inference engine (.github/review-rules.md)
**/src/**/*.rs: Prefer exhaustive match arms over a catch-all _ arm to avoid missing new enum variants (except in tests/benchmarks or when provably safe)
Prefer explicit field ignoring with : _ over .. in struct patterns (except in tests/benchmarks or when provably safe)
Files:
lib/collection/src/shards/shard.rslib/collection/src/shards/forward_proxy_shard.rslib/collection/src/shards/proxy_shard.rslib/collection/src/shards/local_shard/telemetry.rslib/collection/src/shards/queue_proxy_shard.rslib/collection/src/shards/replica_set/telemetry.rs
🧬 Code graph analysis (7)
lib/collection/src/shards/shard.rs (7)
src/actix/api/service_api.rs (1)
telemetry(36-59)lib/collection/src/shards/forward_proxy_shard.rs (2)
get_optimization_status(349-351)get_size_stats(353-355)lib/collection/src/shards/local_shard/telemetry.rs (2)
get_optimization_status(80-97)get_size_stats(99-134)lib/collection/src/shards/proxy_shard.rs (2)
get_optimization_status(152-154)get_size_stats(156-158)lib/collection/src/shards/queue_proxy_shard.rs (2)
get_optimization_status(197-202)get_size_stats(204-206)lib/collection/src/shards/replica_set/telemetry.rs (2)
get_optimization_status(42-50)get_size_stats(52-60)lib/collection/src/shards/dummy_shard.rs (2)
get_optimization_status(76-78)get_size_stats(80-82)
lib/collection/src/shards/forward_proxy_shard.rs (4)
lib/collection/src/shards/local_shard/telemetry.rs (3)
get_telemetry_data(12-78)get_optimization_status(80-97)get_size_stats(99-134)lib/collection/src/shards/proxy_shard.rs (3)
get_telemetry_data(148-150)get_optimization_status(152-154)get_size_stats(156-158)lib/collection/src/shards/queue_proxy_shard.rs (3)
get_telemetry_data(190-195)get_optimization_status(197-202)get_size_stats(204-206)lib/collection/src/shards/shard.rs (3)
get_telemetry_data(73-90)get_optimization_status(92-102)get_size_stats(104-112)
lib/collection/src/shards/proxy_shard.rs (4)
lib/collection/src/shards/forward_proxy_shard.rs (3)
get_telemetry_data(345-347)get_optimization_status(349-351)get_size_stats(353-355)lib/collection/src/shards/local_shard/telemetry.rs (3)
get_telemetry_data(12-78)get_optimization_status(80-97)get_size_stats(99-134)lib/collection/src/shards/queue_proxy_shard.rs (3)
get_telemetry_data(190-195)get_optimization_status(197-202)get_size_stats(204-206)lib/collection/src/shards/shard.rs (3)
get_telemetry_data(73-90)get_optimization_status(92-102)get_size_stats(104-112)
lib/collection/src/shards/local_shard/telemetry.rs (7)
lib/collection/src/shards/forward_proxy_shard.rs (4)
get_telemetry_data(345-347)get_optimization_status(349-351)get_size_stats(353-355)info(503-506)lib/collection/src/shards/proxy_shard.rs (4)
get_telemetry_data(148-150)get_optimization_status(152-154)get_size_stats(156-158)info(265-268)lib/collection/src/shards/queue_proxy_shard.rs (5)
get_telemetry_data(190-195)get_optimization_status(197-202)get_size_stats(204-206)info(300-302)info(624-627)lib/collection/src/shards/replica_set/telemetry.rs (3)
get_telemetry_data(11-40)get_optimization_status(42-50)get_size_stats(52-60)lib/collection/src/shards/shard.rs (3)
get_telemetry_data(73-90)get_optimization_status(92-102)get_size_stats(104-112)lib/collection/src/collection/mod.rs (1)
get_telemetry_data(782-815)lib/collection/src/shards/local_shard/mod.rs (1)
segments(252-254)
lib/collection/src/shards/queue_proxy_shard.rs (3)
lib/collection/src/shards/forward_proxy_shard.rs (3)
get_telemetry_data(345-347)get_optimization_status(349-351)get_size_stats(353-355)lib/collection/src/shards/proxy_shard.rs (3)
get_telemetry_data(148-150)get_optimization_status(152-154)get_size_stats(156-158)lib/collection/src/shards/shard.rs (3)
get_telemetry_data(73-90)get_optimization_status(92-102)get_size_stats(104-112)
lib/collection/src/shards/replica_set/telemetry.rs (6)
lib/collection/src/shards/forward_proxy_shard.rs (1)
get_size_stats(353-355)lib/collection/src/shards/local_shard/telemetry.rs (1)
get_size_stats(99-134)lib/collection/src/shards/proxy_shard.rs (1)
get_size_stats(156-158)lib/collection/src/shards/queue_proxy_shard.rs (1)
get_size_stats(204-206)lib/collection/src/shards/shard.rs (1)
get_size_stats(104-112)lib/collection/src/shards/dummy_shard.rs (1)
get_size_stats(80-82)
tests/consensus_tests/test_shard_snapshot_deadlock.py (5)
src/actix/helpers.rs (1)
time(141-147)tests/consensus_tests/utils.py (2)
assert_project_root(110-113)start_cluster(220-245)tests/consensus_tests/fixtures.py (1)
upsert_random_points(83-129)src/actix/api/service_api.rs (1)
telemetry(36-59)tests/consensus_tests/assertions.py (1)
assert_http_ok(6-13)
🪛 Ruff (0.12.2)
tests/consensus_tests/test_shard_snapshot_deadlock.py
6-6: from .assertions import * used; unable to detect undefined names
(F403)
7-7: from .fixtures import * used; unable to detect undefined names
(F403)
8-8: from .utils import * used; unable to detect undefined names
(F403)
12-12: assert_project_root may be undefined, or defined from star imports
(F405)
15-15: start_cluster may be undefined, or defined from star imports
(F405)
19-19: create_collection may be undefined, or defined from star imports
(F405)
22-22: Probable use of requests call without timeout
(S113)
30-30: Local variable snapshot_chunk is assigned to but never used
Remove assignment to unused variable snapshot_chunk
(F841)
36-36: Local variable upsert is assigned to but never used
Remove assignment to unused variable upsert
(F841)
36-36: upsert_random_points may be undefined, or defined from star imports
(F405)
40-40: Local variable telemetry is assigned to but never used
Remove assignment to unused variable telemetry
(F841)
45-45: assert_http_ok may be undefined, or defined from star imports
(F405)
🔇 Additional comments (10)
lib/collection/src/shards/replica_set/telemetry.rs (1)
45-60: Verify LocalShard is Clone or stored behind Arc
- lib/collection/src/shards/local_shard/mod.rs:89 defines LocalShard; repository search shows no derive/impl Clone or Option<Arc>.
- Action: either derive/impl Clone for LocalShard, or store it as Arc (e.g., change self.local to Option<Arc>) and clone the Arc at call sites.
lib/collection/src/shards/forward_proxy_shard.rs (1)
345-355: LGTM: async forwarding avoids blocking and aligns with the new async telemetry API.
Straightforward awaits on the wrapped shard; consistent with analogous methods elsewhere.lib/collection/src/shards/shard.rs (2)
92-101: Async delegation looks good.Async signature + awaited delegation across variants is correct.
104-111: Async delegation looks good.Consistent awaited forwarding for size stats.
lib/collection/src/shards/proxy_shard.rs (2)
152-154: Async delegation LGTM.
156-158: Async delegation LGTM.lib/collection/src/shards/queue_proxy_shard.rs (2)
197-202: Async delegation LGTM.
204-205: Async delegation LGTM.lib/collection/src/shards/local_shard/telemetry.rs (2)
12-78: Good async offload; fixes the deadlock vector.Moving sync RwLock reads into spawn_blocking and awaiting from async callers is the right call for Actix/Tokio. The construction of LocalShardTelemetry looks consistent with Shard::get_telemetry_data post-processing.
12-78: Sanity-check async telemetry call sites — ensure .await is usedSandbox search returned no matches (lib/collection not present in this checkout). Run locally to find un-awaited calls:
rg -nP --hidden -S '\bget_(telemetry_data|get_optimization_status|get_size_stats)\s*(' -g '!/target/' -g '!/node_modules/' -g '!/.git/' | grep -v '.await'
File to check: lib/collection/src/shards/local_shard/telemetry.rs — verify ranges 12-78, 80-97, 99-134.
| let Some(local) = local_shard.deref() else { | ||
| return None; | ||
| }; | ||
|
|
||
| Some(local.get_optimization_status().await) | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Do not hold the self.local read-lock across .await; clone handle and drop the guard first.
Holding a tokio::RwLock read guard across an await can block writers (e.g., during reconfiguration), risking latency spikes or deadlocks. Clone/extract the shard, release the lock, then await.
- let Some(local) = local_shard.deref() else {
- return None;
- };
-
- Some(local.get_optimization_status().await)
+ // Extract local shard, drop the lock, then await.
+ let local = local_shard.clone();
+ drop(local_shard);
+ let Some(local) = local else { return None; };
+ Some(local.get_optimization_status().await)If LocalShard isn’t Clone, consider storing it behind Arc (e.g., Option<Arc<LocalShard>>) and cloning the Arc here.
Committable suggestion skipped: line range outside the PR's diff.
| let Some(local) = local_shard.deref() else { | ||
| return SizeStats::default(); | ||
| }; | ||
|
|
||
| local.get_size_stats().await | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Same: release self.local read-lock before awaiting get_size_stats().
- let Some(local) = local_shard.deref() else {
- return SizeStats::default();
- };
-
- local.get_size_stats().await
+ let local = local_shard.clone();
+ drop(local_shard);
+ match local {
+ Some(local) => local.get_size_stats().await,
+ None => SizeStats::default(),
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let Some(local) = local_shard.deref() else { | |
| return SizeStats::default(); | |
| }; | |
| local.get_size_stats().await | |
| } | |
| let local = local_shard.clone(); | |
| drop(local_shard); | |
| match local { | |
| Some(local) => local.get_size_stats().await, | |
| None => SizeStats::default(), | |
| } | |
| } |
| from .assertions import * | ||
| from .fixtures import * | ||
| from .utils import * |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Replace star imports with explicit imports to satisfy Ruff and avoid namespace ambiguity.
Prevents F403/F405 and clarifies dependencies.
-from .assertions import *
-from .fixtures import *
-from .utils import *
+from .assertions import assert_http_ok
+from .fixtures import create_collection, upsert_random_points
+from .utils import assert_project_root, start_clusterCommittable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.12.2)
6-6: from .assertions import * used; unable to detect undefined names
(F403)
7-7: from .fixtures import * used; unable to detect undefined names
(F403)
8-8: from .utils import * used; unable to detect undefined names
(F403)
🤖 Prompt for AI Agents
In tests/consensus_tests/test_shard_snapshot_deadlock.py around lines 6 to 8,
the file currently uses three wildcard imports (from .assertions import *, from
.fixtures import *, from .utils import *); replace these star imports with
explicit named imports from each module: inspect the test file to identify
exactly which functions/classes/constants are referenced from .assertions,
.fixtures and .utils, then change each line to import only those symbols (e.g.
from .assertions import assert_x, assert_y), run ruff/pytest to ensure no
F403/F405 errors remain, and remove any unused imports found.
| create_collection(peer_url) | ||
|
|
||
| # Request streaming shard snapshot | ||
| snapshot = requests.get(f"{peer_url}/collections/test_collection/shards/0/snapshot", stream = True) |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Add a timeout to the snapshot request to avoid indefinite hangs in CI.
Even though the stream is intentionally left open, a finite timeout guards against stalls before the first chunk.
- snapshot = requests.get(f"{peer_url}/collections/test_collection/shards/0/snapshot", stream = True)
+ snapshot = requests.get(
+ f"{peer_url}/collections/test_collection/shards/0/snapshot",
+ stream=True,
+ timeout=5,
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| snapshot = requests.get(f"{peer_url}/collections/test_collection/shards/0/snapshot", stream = True) | |
| snapshot = requests.get( | |
| f"{peer_url}/collections/test_collection/shards/0/snapshot", | |
| stream=True, | |
| timeout=5, | |
| ) |
🧰 Tools
🪛 Ruff (0.12.2)
22-22: Probable use of requests call without timeout
(S113)
🤖 Prompt for AI Agents
In tests/consensus_tests/test_shard_snapshot_deadlock.py around line 22, the
requests.get call for the shard snapshot uses stream=True but lacks a timeout
which can cause indefinite hangs in CI; add a timeout parameter (e.g.,
timeout=(5, 30) or a single timeout like timeout=10) to the requests.get
invocation so the call will fail fast if the connection or first read stalls
while preserving stream=True behavior.
| # Read only the first response chunk, but don't read response to completion. This ensures | ||
| # that snapshot processing started, but makes request "hang" while holding segment read-lock. | ||
| snapshot_stream = snapshot.iter_content(None) | ||
| snapshot_chunk = next(snapshot_stream) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Read a minimal chunk and assert progress; current None chunk size may block too long.
Using a 1-byte chunk yields the lock quickly and reduces flakiness; also assert non-empty chunk.
- snapshot_stream = snapshot.iter_content(None)
- snapshot_chunk = next(snapshot_stream)
+ snapshot_stream = snapshot.iter_content(chunk_size=1)
+ first_chunk = next(snapshot_stream)
+ assert first_chunk, "Snapshot stream produced an empty first chunk"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Read only the first response chunk, but don't read response to completion. This ensures | |
| # that snapshot processing started, but makes request "hang" while holding segment read-lock. | |
| snapshot_stream = snapshot.iter_content(None) | |
| snapshot_chunk = next(snapshot_stream) | |
| # Read only the first response chunk, but don't read response to completion. This ensures | |
| # that snapshot processing started, but makes request "hang" while holding segment read-lock. | |
| snapshot_stream = snapshot.iter_content(chunk_size=1) | |
| first_chunk = next(snapshot_stream) | |
| assert first_chunk, "Snapshot stream produced an empty first chunk" |
🧰 Tools
🪛 Ruff (0.12.2)
30-30: Local variable snapshot_chunk is assigned to but never used
Remove assignment to unused variable snapshot_chunk
(F841)
🤖 Prompt for AI Agents
In tests/consensus_tests/test_shard_snapshot_deadlock.py around lines 27 to 31,
the test currently calls snapshot.iter_content(None) which can block too long;
change to snapshot.iter_content(1) to read a 1-byte chunk so the segment
read-lock is acquired quickly, then assert the returned snapshot_chunk is
non-empty (e.g., assert snapshot_chunk) to ensure progress was made.
| # Run background executor to send blocking requests without blocking the test | ||
| executor = ThreadPoolExecutor(max_workers = 2) | ||
|
|
||
| # Upsert a point, to block on segment write-lock | ||
| upsert = executor.submit(upsert_random_points, peer_url, 10) | ||
| time.sleep(0.1) | ||
|
|
||
| # Get telemetry, to block on segment read-lock, which would block Actix worker | ||
| telemetry = executor.submit(requests.get, f"{peer_url}/telemetry", params = { "details_level": "1" }) | ||
| time.sleep(0.1) |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Ensure resources are cleaned up: auto-shutdown the executor and close the snapshot stream.
Prevents thread leakage and long-lived open HTTP connections across test runs.
- # Run background executor to send blocking requests without blocking the test
- executor = ThreadPoolExecutor(max_workers = 2)
-
- # Upsert a point, to block on segment write-lock
- upsert = executor.submit(upsert_random_points, peer_url, 10)
- time.sleep(0.1)
-
- # Get telemetry, to block on segment read-lock, which would block Actix worker
- telemetry = executor.submit(requests.get, f"{peer_url}/telemetry", params = { "details_level": "1" })
- time.sleep(0.1)
-
- # Try to query Qdrant version info, which would block and timeout if Actix worker is blocked
- resp = requests.get(f"{peer_url}/", timeout = 0.5)
- assert_http_ok(resp)
+ # Run background executor to send blocking requests without blocking the test
+ try:
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ # Upsert a point, to block on segment write-lock
+ executor.submit(upsert_random_points, peer_url, 10)
+ time.sleep(0.1)
+
+ # Get telemetry, to block on segment read-lock, which would block Actix worker
+ executor.submit(
+ requests.get,
+ f"{peer_url}/telemetry",
+ None,
+ {"details_level": "1"},
+ 5, # timeout
+ )
+ time.sleep(0.1)
+
+ # Try to query Qdrant version info, which would block and timeout if Actix worker is blocked
+ resp = requests.get(f"{peer_url}/", timeout=1.0)
+ assert_http_ok(resp)
+ finally:
+ snapshot.close()Also applies to: 45-45
🧰 Tools
🪛 Ruff (0.12.2)
36-36: Local variable upsert is assigned to but never used
Remove assignment to unused variable upsert
(F841)
36-36: upsert_random_points may be undefined, or defined from star imports
(F405)
40-40: Local variable telemetry is assigned to but never used
Remove assignment to unused variable telemetry
(F841)
🤖 Prompt for AI Agents
In tests/consensus_tests/test_shard_snapshot_deadlock.py around lines 32-41 (and
also apply same fix at line 45), the ThreadPoolExecutor and the HTTP
snapshot/telemetry response are left open causing thread leakage and lingering
connections; ensure the executor is shut down and any requests response/stream
is closed: capture the Future/response objects, call
executor.shutdown(wait=True) after the background tasks complete (or use a
context manager) and explicitly close response objects (response.close() or use
with requests.get(...) as resp) for the snapshot/telemetry calls so all threads
and HTTP connections are cleaned up.
| segments: (!segments.is_empty()).then_some(segments), | ||
| segments: if segments.is_empty() { | ||
| None | ||
| } else { | ||
| Some(segments) | ||
| }, |
* Add `test_shard_snapshot_deadlock` integration test * Fix deadlock during streaming shard snapshot
This PR:
LocalShardto beasyncand usespawn_blocking, so that telemetry API won't block Actix runtime with a blocking read-lock callAll Submissions:
devbranch. Did you create your branch fromdev?New Feature Submissions:
cargo +nightly fmt --allcommand prior to submission?cargo clippy --all --all-featurescommand?Changes to Core Features: