Skip to content

Add timeout to cancel metrics and telemetry calls#7579

Merged
timvisee merged 3 commits intodevfrom
timeout-cancels-metrics-telemetry
Nov 25, 2025
Merged

Add timeout to cancel metrics and telemetry calls#7579
timvisee merged 3 commits intodevfrom
timeout-cancels-metrics-telemetry

Conversation

@agourlay
Copy link
Member

@agourlay agourlay commented Nov 21, 2025

This PR adds a new timeout support for the telemetry endpoint and the metrics endpoint.

The goal is to prevent potentially long running computations to conflict with other workloads.

This new timeout is now always present internally with a default value of 60 seconds.
It can be changed to any positive value via the API to handle extreme cases.

The telemetry/metrics workload cancellation works at different level to juggle with the sync. operations.

  1. an async tokio::timeout on the top level future
  2. a propagation into blocking tasks via a cancellation token on drop
  3. an upper bound on the maximum time to wait to acquire the underlying sync. locks

Before this PR, the telemetry/metrics API were not fallible.
They would always return successfully at the cost of returning partial results.

Both APIs are now fallible to be able to bubble up timeouts internally.

@agourlay agourlay force-pushed the timeout-cancels-metrics-telemetry branch from 46f3803 to 5243bdc Compare November 24, 2025 12:42
@agourlay agourlay changed the title [WIP] Add timeout to cancel metrics and telemetry calls Add timeout to cancel metrics and telemetry calls Nov 24, 2025
@agourlay agourlay marked this pull request as ready for review November 24, 2025 14:00
coderabbitai[bot]

This comment was marked as resolved.

@qdrant qdrant deleted a comment from coderabbitai bot Nov 24, 2025
@agourlay agourlay force-pushed the timeout-cancels-metrics-telemetry branch 2 times, most recently from 36a781a to f5fb11b Compare November 24, 2025 17:04
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (10)
lib/storage/src/content_manager/toc/telemetry.rs (1)

65-82: Consider checking is_stopped_guard before snapshot telemetry collection.

While snapshot telemetry collection is likely fast (reading atomic counters), checking is_stopped_guard before line 65 would make cancellation behavior more consistent throughout the function.

Apply this diff to add the check:

+        if is_stopped_guard.is_stopped() {
+            return Ok(TocTelemetryData {
+                collection_telemetry,
+                snapshot_telemetry: Vec::new(),
+            });
+        }
+
         let snapshot_telemetry: Vec<_> = self
openapi/openapi-service.ytt.yaml (2)

42-49: Clarify timeout units in the parameter description.

The timeout parameter description should specify that the value is in seconds to avoid ambiguity for API consumers.

Consider updating the description to:

-          description: "Timeout for this request"
+          description: "Timeout for this request in seconds"

66-73: Clarify timeout units in the parameter description.

The timeout parameter description should specify that the value is in seconds to avoid ambiguity for API consumers.

Consider updating the description to:

-          description: "Timeout for this request"
+          description: "Timeout for this request in seconds"
docs/redoc/master/openapi.json (2)

247-257: Clarify units/range and add example for timeout (consistency with other endpoints).

Current description is vague. Recommend explicitly stating seconds, allowed range, and default behavior; add an example for better SDK docs.

           {
             "name": "timeout",
             "in": "query",
-            "description": "Timeout for this request",
+            "description": "If set, overrides the internal timeout for this request. Unit is seconds. Allowed range: 1–10. If omitted, the server default is used.",
             "required": false,
             "schema": {
               "type": "integer",
               "minimum": 1,
-              "maximum": 10
-            }
+              "maximum": 10
+            },
+            "example": 5
           }

337-347: Mirror telemetry timeout wording for metrics; add example.

Keep the same phrasing and example to avoid ambiguity and ease client generation.

           {
             "name": "timeout",
             "in": "query",
-            "description": "Timeout for this request",
+            "description": "If set, overrides the internal timeout for this request. Unit is seconds. Allowed range: 1–10. If omitted, the server default is used.",
             "required": false,
             "schema": {
               "type": "integer",
               "minimum": 1,
-              "maximum": 10
-            }
+              "maximum": 10
+            },
+            "example": 5
           }
src/common/telemetry_reporting.rs (1)

7-7: Telemetry reporter error handling and result type look consistent

TelemetryReporter::report now uses StorageResult<()>, propagates failures from prepare_data, JSON serialization, and HTTP I/O, and logs non‑success HTTP statuses while still returning Ok(()). This matches a best‑effort, background reporting model and aligns with the new fallible telemetry plumbing. Passing None as timeout to prepare_data is also reasonable if this path intentionally avoids the API timeout constraints.

If you want producer telemetry to respect the same internal timeout as the public telemetry endpoint, consider threading a concrete Duration here instead of None so this background task can’t compete indefinitely with foreground workloads.

Also applies to: 40-63

lib/collection/src/collection/telemetry.rs (1)

1-2: Collection telemetry now correctly participates in timeout/cancellation

The updated Collection::get_telemetry_data signature and body cleanly propagate timeout and StoppingGuard down to shards and return CollectionResult<CollectionTelemetry>, letting shard‑level timeouts bubble up while preserving the existing telemetry shape. This is consistent with the new timeout‑aware shard APIs.

If you ever see this become a bottleneck with many shards, consider parallelizing the per‑shard get_telemetry_data calls under the read lock or snapshotting the shard list first and then gathering telemetry concurrently, while still honoring the same timeout and stopping guard semantics.

Also applies to: 4-4, 7-7, 11-16, 22-26, 44-52

lib/collection/src/shards/shard.rs (1)

4-4: Shard::get_telemetry_data correctly unifies timeout/guard-aware telemetry retrieval

The Shard enum’s get_telemetry_data now consistently:

  • Accepts timeout: Duration and &StoppingGuard.
  • Delegates to each concrete shard’s timeout/guard‑aware telemetry method, propagating CollectionResult errors.
  • Preserves the existing status enrichment for local shards and variant_name tagging for all variants.

This is a clean centralization of the new telemetry behavior with no obvious edge‑case regressions.

If local_shard_status() ever becomes a source of blocking or contention, you may later want to make it timeout/guard‑aware as well (or at least guard it with the same StoppingGuard), but it’s reasonable to keep it straightforward for now.

Also applies to: 12-12, 75-111

lib/collection/src/shards/replica_set/telemetry.rs (1)

13-51: Replica set telemetry timeout/guard propagation looks consistent; consider remotes

The replica-set get_telemetry_data correctly threads timeout and StoppingGuard into the local shard and wraps everything in CollectionResult, so local timeouts/cancellations can bubble up cleanly. The remote side still calls remote.get_telemetry_data(detail) without timeout/guard; if remote telemetry can ever block on IO or long‑held locks, you may want to extend that API similarly so a timed‑out or cancelled overall telemetry request doesn’t keep waiting on remotes.

src/actix/api/service_api.rs (1)

85-127: Metrics timeout/error handling looks correct; consider improving timing measurement

The metrics handler now treats prepare_data as fallible and routes failures through process_response_error, while only anonymizing and formatting metrics on the success path, which aligns with the new timeout behavior. One small improvement: to get a more accurate elapsed time in error logs, you might want to capture let timing = Instant::now(); once near the top of the handler and reuse it in both the access check failure and the Err(err) branch instead of calling Instant::now() at the error site.

 #[get("/metrics")]
 async fn metrics(
-    telemetry_collector: Data<Mutex<TelemetryCollector>>,
+    telemetry_collector: Data<Mutex<TelemetryCollector>>,
     params: Query<MetricsParam>,
     config: Data<ServiceConfig>,
     ActixAccess(access): ActixAccess,
 ) -> HttpResponse {
-    if let Err(err) = access.check_global_access(AccessRequirements::new()) {
-        return process_response_error(err, Instant::now(), None);
-    }
+    let timing = Instant::now();
+    if let Err(err) = access.check_global_access(AccessRequirements::new()) {
+        return process_response_error(err, timing, None);
+    }
@@
-    match telemetry_data {
-        Err(err) => process_response_error(err, Instant::now(), None),
+    match telemetry_data {
+        Err(err) => process_response_error(err, timing, None),
         Ok(telemetry_data) => {
             let telemetry_data = if anonymize {
                 telemetry_data.anonymize()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a92c69 and f3a8a3e.

📒 Files selected for processing (18)
  • docs/redoc/master/openapi.json (2 hunks)
  • lib/collection/src/collection/telemetry.rs (2 hunks)
  • lib/collection/src/shards/forward_proxy_shard.rs (2 hunks)
  • lib/collection/src/shards/local_shard/telemetry.rs (3 hunks)
  • lib/collection/src/shards/proxy_shard.rs (2 hunks)
  • lib/collection/src/shards/queue_proxy_shard.rs (2 hunks)
  • lib/collection/src/shards/replica_set/telemetry.rs (2 hunks)
  • lib/collection/src/shards/shard.rs (2 hunks)
  • lib/collection/src/shards/telemetry.rs (1 hunks)
  • lib/shard/src/common/stopping_guard.rs (2 hunks)
  • lib/storage/src/content_manager/errors.rs (1 hunks)
  • lib/storage/src/content_manager/toc/telemetry.rs (3 hunks)
  • openapi/openapi-service.ytt.yaml (2 hunks)
  • src/actix/api/service_api.rs (5 hunks)
  • src/actix/helpers.rs (3 hunks)
  • src/common/telemetry.rs (4 hunks)
  • src/common/telemetry_ops/collections_telemetry.rs (4 hunks)
  • src/common/telemetry_reporting.rs (2 hunks)
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-10-13T22:58:03.121Z
Learnt from: generall
Repo: qdrant/qdrant PR: 7400
File: lib/segment/src/id_tracker/simple_id_tracker.rs:234-241
Timestamp: 2025-10-13T22:58:03.121Z
Learning: SimpleIdTracker in lib/segment/src/id_tracker/simple_id_tracker.rs is being deprecated and should not receive fixes related to version tracking or recovery logic, as it has a different version storage structure that is incompatible with newer trackers.

Applied to files:

  • lib/collection/src/shards/shard.rs
📚 Learning: 2025-07-11T11:35:21.549Z
Learnt from: generall
Repo: qdrant/qdrant PR: 6854
File: lib/segment/src/index/query_estimator.rs:320-327
Timestamp: 2025-07-11T11:35:21.549Z
Learning: In test code for Qdrant's query estimator (lib/segment/src/index/query_estimator.rs), simplified ID resolution logic using `id.to_string().parse().unwrap()` is acceptable for testing purposes and doesn't need to match production code's `id_tracker.internal_id()` approach. Test code can use mock implementations that serve the testing goals.

Applied to files:

  • lib/collection/src/shards/shard.rs
📚 Learning: 2025-09-01T11:42:06.964Z
Learnt from: timvisee
Repo: qdrant/qdrant PR: 7157
File: lib/shard/src/segment_holder/mod.rs:808-814
Timestamp: 2025-09-01T11:42:06.964Z
Learning: In Qdrant's segment holder, panicking when no segments exist during flush_all is intentional and preferred over graceful error handling, as having zero segments could permanently corrupt the WAL by acknowledging u64::MAX. The maintainers consider this condition impossible and use the panic as a fail-fast safety mechanism to prevent data corruption.

Applied to files:

  • lib/collection/src/shards/shard.rs
📚 Learning: 2025-08-10T18:30:02.986Z
Learnt from: generall
Repo: qdrant/qdrant PR: 7006
File: lib/collection/src/operations/verification/update.rs:158-174
Timestamp: 2025-08-10T18:30:02.986Z
Learning: In Qdrant's strict mode verification code (lib/collection/src/operations/verification/update.rs), exhaustive pattern matching without `..` is intentionally used for structs like PointsBatch and PointsList. This design pattern ensures compilation fails when new fields are added, forcing developers to explicitly consider how new fields should be handled in the indexed_filter_write method. This provides visibility and compile-time safety for struct evolution.

Applied to files:

  • lib/collection/src/shards/proxy_shard.rs
  • lib/collection/src/shards/forward_proxy_shard.rs
  • src/common/telemetry.rs
🧬 Code graph analysis (10)
lib/collection/src/shards/queue_proxy_shard.rs (3)
lib/collection/src/shards/forward_proxy_shard.rs (1)
  • get_telemetry_data (360-369)
lib/collection/src/shards/proxy_shard.rs (1)
  • get_telemetry_data (148-157)
lib/collection/src/shards/shard.rs (1)
  • get_telemetry_data (75-111)
lib/collection/src/shards/shard.rs (8)
lib/collection/src/collection/telemetry.rs (1)
  • get_telemetry_data (11-53)
lib/collection/src/shards/forward_proxy_shard.rs (1)
  • get_telemetry_data (360-369)
lib/collection/src/shards/proxy_shard.rs (1)
  • get_telemetry_data (148-157)
lib/collection/src/shards/queue_proxy_shard.rs (1)
  • get_telemetry_data (190-200)
lib/storage/src/content_manager/toc/telemetry.rs (1)
  • get_telemetry_data (43-88)
lib/collection/src/shards/dummy_shard.rs (1)
  • get_telemetry_data (63-78)
lib/segment/src/segment/entry.rs (1)
  • get_telemetry_data (881-898)
lib/collection/src/shards/remote_shard.rs (1)
  • get_telemetry_data (202-215)
lib/collection/src/shards/proxy_shard.rs (7)
lib/collection/src/shards/forward_proxy_shard.rs (1)
  • get_telemetry_data (360-369)
lib/collection/src/shards/queue_proxy_shard.rs (1)
  • get_telemetry_data (190-200)
lib/collection/src/shards/replica_set/telemetry.rs (1)
  • get_telemetry_data (13-51)
lib/collection/src/shards/shard.rs (1)
  • get_telemetry_data (75-111)
lib/segment/src/index/hnsw_index/hnsw.rs (1)
  • get_telemetry_data (1458-1472)
lib/segment/src/segment/entry.rs (1)
  • get_telemetry_data (881-898)
lib/segment/src/index/vector_index_base.rs (2)
  • get_telemetry_data (36-36)
  • get_telemetry_data (241-261)
lib/collection/src/shards/forward_proxy_shard.rs (4)
lib/collection/src/shards/local_shard/telemetry.rs (1)
  • get_telemetry_data (21-118)
lib/collection/src/shards/proxy_shard.rs (1)
  • get_telemetry_data (148-157)
lib/collection/src/shards/queue_proxy_shard.rs (1)
  • get_telemetry_data (190-200)
lib/collection/src/shards/shard.rs (1)
  • get_telemetry_data (75-111)
lib/storage/src/content_manager/toc/telemetry.rs (1)
lib/shard/src/common/stopping_guard.rs (1)
  • new (12-16)
lib/collection/src/shards/replica_set/telemetry.rs (4)
lib/collection/src/shards/forward_proxy_shard.rs (1)
  • get_telemetry_data (360-369)
lib/collection/src/shards/proxy_shard.rs (1)
  • get_telemetry_data (148-157)
lib/collection/src/shards/queue_proxy_shard.rs (1)
  • get_telemetry_data (190-200)
lib/collection/src/shards/shard.rs (1)
  • get_telemetry_data (75-111)
src/common/telemetry.rs (3)
lib/collection/src/operations/verification/mod.rs (1)
  • new_unchecked_verification_pass (26-28)
lib/storage/src/content_manager/errors.rs (1)
  • timeout (103-110)
lib/shard/src/common/stopping_guard.rs (1)
  • new (12-16)
src/actix/api/service_api.rs (3)
src/actix/helpers.rs (2)
  • time (141-147)
  • process_response_error (119-125)
src/actix/api/collections_api.rs (1)
  • timeout (29-31)
src/actix/api/read_params.rs (1)
  • timeout (19-21)
lib/storage/src/content_manager/errors.rs (1)
lib/collection/src/operations/types.rs (5)
  • service_error (944-949)
  • bad_request (967-971)
  • bad_input (951-955)
  • not_found (957-959)
  • timeout (935-942)
lib/collection/src/shards/local_shard/telemetry.rs (4)
lib/segment/src/vector_storage/common.rs (1)
  • get_async_scorer (12-14)
lib/collection/src/shards/shard.rs (1)
  • get_telemetry_data (75-111)
lib/storage/src/content_manager/errors.rs (1)
  • timeout (103-110)
lib/common/common/src/types.rs (1)
  • default (68-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: e2e-tests
  • GitHub Check: test-shard-snapshot-api-s3-minio
  • GitHub Check: integration-tests-consensus
  • GitHub Check: integration-tests
  • GitHub Check: test-consistency
  • GitHub Check: rust-tests (windows-latest)
  • GitHub Check: test-consensus-compose
  • GitHub Check: rust-tests-no-rocksdb (ubuntu-latest)
  • GitHub Check: rust-tests (macos-latest)
  • GitHub Check: rust-tests (ubuntu-latest)
  • GitHub Check: lint
  • GitHub Check: Build Qdrant Edge Python bindings
🔇 Additional comments (19)
lib/storage/src/content_manager/toc/telemetry.rs (4)

3-3: LGTM! Imports align with timeout and cancellation support.

The new imports for Duration, CollectionResult, and StoppingGuard are appropriate for the timeout and cancellation functionality being added.

Also applies to: 5-5, 12-12


43-49: LGTM! Method signature correctly supports timeout and cancellation.

The updated signature properly accepts timeout and is_stopped_guard parameters and returns CollectionResult to allow timeout errors to propagate.


51-62: LGTM! Proper cancellation and timeout propagation.

The early break when is_stopped_guard.is_stopped() allows graceful cancellation with partial results, and the timeout and stopping guard are correctly propagated to collection.get_telemetry_data().


90-106: The suggested fix is incompatible with the method signature.

The collection method get_aggregated_telemetry_data() at lib/collection/src/collection/mod.rs:786 only accepts &self and returns CollectionsAggregatedTelemetry. It does not support timeout or is_stopped_guard parameters, unlike get_telemetry_data() which does accept these parameters (at lib/collection/src/collection/telemetry.rs:11).

The design intentionally differs between the two methods. The review comment's suggested diff to pass timeout and is_stopped_guard to get_aggregated_telemetry_data() would fail compilation because the method signature does not support these parameters.

Likely an incorrect or invalid review comment.

lib/shard/src/common/stopping_guard.rs (2)

5-5: LGTM!

Adding Clone to StoppingGuard is appropriate since it wraps an Arc<AtomicBool>, enabling the guard to be shared across multiple telemetry collection tasks.


22-24: LGTM!

The is_stopped() method using Relaxed ordering is appropriate for this cancellation flag use case, consistent with the existing Drop implementation.

lib/collection/src/shards/queue_proxy_shard.rs (1)

190-200: LGTM!

The updated signature correctly aligns with the timeout and cancellation patterns implemented across other shard types (ForwardProxyShard, ProxyShard, LocalShard). The method properly forwards the timeout and stopping guard to the wrapped shard and returns a CollectionResult for error propagation.

lib/collection/src/shards/telemetry.rs (1)

38-38: LGTM!

Adding Default to LocalShardTelemetry is safe and useful. All fields are either Option types or have Default implementations (e.g., OptimizerTelemetry on line 77), making this a sensible addition for initializing telemetry structs.

src/actix/helpers.rs (1)

12-12: LGTM!

Switching to the StorageResult<T> type alias improves consistency with the storage crate conventions. This is semantically equivalent to the previous Result<T, StorageError> signature and causes no functional change.

Also applies to: 143-143, 153-153

src/common/telemetry_reporting.rs (1)

69-71: Logging on report failures is appropriate

Wrapping report in if let Err(err) and logging the error keeps the hourly loop resilient while surfacing genuine failures (serialization/HTTP errors) without crashing the background task.

lib/collection/src/shards/proxy_shard.rs (1)

18-18: ProxyShard telemetry forwarding matches the new shard telemetry contract

ProxyShard::get_telemetry_data correctly mirrors the other proxy variants by accepting (detail, timeout, &StoppingGuard) and forwarding all three to the wrapped LocalShard, returning CollectionResult<LocalShardTelemetry>. This keeps the shard enum implementation simple and consistent.

Also applies to: 148-157

lib/collection/src/shards/forward_proxy_shard.rs (1)

17-17: ForwardProxyShard telemetry timeout/guard wiring is correct

The new get_telemetry_data method cleanly forwards detail, timeout, and is_stopped_guard to the wrapped LocalShard, returning CollectionResult<LocalShardTelemetry>. This keeps the forward proxy in line with other shard variants and integrates it into the timeout/cancellation pipeline.

Also applies to: 360-369

src/common/telemetry_ops/collections_telemetry.rs (1)

36-80: Collections telemetry orchestration with timeout/guard looks consistent

The revised CollectionsTelemetry::collect cleanly propagates the new timeout and StoppingGuard down into TableOfContent::get_telemetry_data and get_aggregated_telemetry_data, and now returns CollectionResult<Self>. This keeps the level‑based behavior intact (full vs aggregated telemetry) while letting shard/collection timeouts and cancellations bubble up through a single CollectionResult path.

lib/storage/src/content_manager/errors.rs (1)

53-111: StorageError constructors and timeout helper are consistent

Returning Self from the helper constructors and adding StorageError::timeout(timeout_sec, operation) aligns this module with the existing CollectionError helpers and simplifies error construction. The new timeout variant is a good fit for surfacing telemetry/metrics timeouts with a clear, formatted description.

lib/collection/src/shards/local_shard/telemetry.rs (1)

21-70: Remove incorrect review concerns; implementation handles error conversion and type conversion appropriately

The original review contains two significant errors:

  1. JoinError conversion exists: The codebase already has impl From<JoinError> for CollectionError at lib/collection/src/operations/types.rs:1179, so AbortOnDropHandle::new(handle).await? will compile and work correctly. The flagged compilation risk does not exist.

  2. HashMap wrapping is necessary, not redundant: SizeStats::num_vectors_by_name is defined as TinyMap<VectorNameBuf, usize>, not HashMap. The code at lines 94–117 correctly converts from TinyMap to HashMap with HashMap::from(num_vectors_by_name) before storing in LocalShardTelemetry. This is a required type conversion, not unnecessary wrapping.

The implementation's timeout handling, spawn_blocking usage, and StoppingGuard integration remain sound. No changes are needed.

src/actix/api/service_api.rs (4)

3-18: Timeout and validation imports are coherent and used correctly

The added imports for Duration, Data, Query, and Validate line up with the new timeout helpers and validated query extractors; everything appears used and scoped appropriately.


29-41: TelemetryParam timeout field and helper look sound

The timeout: Option<u64> with #[validate(range(min = 1, max = 10))] plus the timeout(&self) -> Option<Duration> helper gives a clear 1–10s bound while keeping the conversion logic simple. This mirrors the existing pattern in other APIs and should behave well with actix_web_validator::Query automatically enforcing the range.


43-70: Telemetry handler correctly propagates timeout into telemetry collection

Using Data<Mutex<TelemetryCollector>> and calling prepare_data(&access, detail, params.timeout()).await? threads the optional timeout through without changing the anonymization logic. The error propagation via ? inside helpers::time matches the pattern used for other fallible endpoints in this file.


72-83: MetricsParam timeout configuration is consistent with TelemetryParam

MetricsParam mirrors TelemetryParam with the same 1–10s validation and timeout() helper, which keeps behavior consistent across /telemetry and /metrics while keeping the mapping to Duration trivial.

Comment on lines +80 to 118
pub async fn prepare_data(
&self,
access: &Access,
detail: TelemetryDetail,
timeout: Option<Duration>,
) -> StorageResult<TelemetryData> {
let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT);
// Use blocking pool because the collection telemetry acquires several sync. locks.
let collections_telemetry = {
let collections_telemetry_handle = {
let toc = self
.dispatcher
.toc(access, &new_unchecked_verification_pass())
.clone();
let runtime_handle = toc.general_runtime_handle().clone();
let access_collection = access.clone();
let is_stopped_guard = StoppingGuard::new();

let handle = runtime_handle.spawn_blocking(move || {
// Re-enter the async runtime in this blocking thread
tokio::runtime::Handle::current().block_on(async move {
CollectionsTelemetry::collect(detail, &access_collection, &toc).await
CollectionsTelemetry::collect(
detail,
&access_collection,
&toc,
timeout,
&is_stopped_guard,
)
.await
})
});
AbortOnDropHandle::new(handle).await
AbortOnDropHandle::new(handle)
};

let collections_telemetry = collections_telemetry
.map_err(|err| log::error!("Failed to generate collection telemetry {err}"))
.unwrap_or_default();
let collections_telemetry = tokio::time::timeout(timeout, collections_telemetry_handle)
.await
.map_err(|_: Elapsed| {
StorageError::timeout(timeout.as_secs() as usize, "collections telemetry")
})???;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

prepare_data timeout/cancellation wiring: guard not observable from outside + triple-? readability

Two things to consider here:

  1. StoppingGuard lifetime / effectiveness

    StoppingGuard::new() is created inside the spawn_blocking closure and moved into it:

    let is_stopped_guard = StoppingGuard::new();
    
    let handle = runtime_handle.spawn_blocking(move || {
        // ...
        CollectionsTelemetry::collect(
            detail,
            &access_collection,
            &toc,
            timeout,
            &is_stopped_guard,
        )
        .await
    })

    Because there is no copy of the guard kept in prepare_data, dropping the outer future (or hitting the tokio::time::timeout) does not, by itself, drop a guard that the blocking task can observe. Unless something else outside this call holds and drops a clone, is_stopped_guard.is_stopped() checks in deeper layers will never flip to true, and blocking loops will only stop when they naturally complete.

    If the intent is to propagate the outer timeout/cancellation into blocking code via StoppingGuard, it would be safer to:

    • create the guard outside,
    • clone it into the blocking task,
    • and rely on the outer guard being dropped on early return (? on timeout/error) to signal “stop” to all clones.

    For example:

    -    pub async fn prepare_data(
    -        &self,
    -        access: &Access,
    -        detail: TelemetryDetail,
    -        timeout: Option<Duration>,
    -    ) -> StorageResult<TelemetryData> {
    -        let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT);
    -        // Use blocking pool because the collection telemetry acquires several sync. locks.
    -        let collections_telemetry_handle = {
    +    pub async fn prepare_data(
    +        &self,
    +        access: &Access,
    +        detail: TelemetryDetail,
    +        timeout: Option<Duration>,
    +    ) -> StorageResult<TelemetryData> {
    +        let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT);
    +        let is_stopped_guard = StoppingGuard::new();
    +        // Use blocking pool because the collection telemetry acquires several sync. locks.
    +        let collections_telemetry_handle = {
                let toc = self
                    .dispatcher
                    .toc(access, &new_unchecked_verification_pass())
                    .clone();
                let runtime_handle = toc.general_runtime_handle().clone();
                let access_collection = access.clone();
    -            let is_stopped_guard = StoppingGuard::new();
    +            let is_stopped_guard = is_stopped_guard.clone();
    @@
    -            AbortOnDropHandle::new(handle)
    -        };
    -
    -        let collections_telemetry = tokio::time::timeout(timeout, collections_telemetry_handle)
    -            .await
    -            .map_err(|_: Elapsed| {
    -                StorageError::timeout(timeout.as_secs() as usize, "collections telemetry")
    -            })???;
    +            AbortOnDropHandle::new(handle)
    +        };
    +
    +        let join_result = tokio::time::timeout(timeout, collections_telemetry_handle)
    +            .await
    +            .map_err(|_: Elapsed| {
    +                StorageError::timeout(timeout.as_secs() as usize, "collections telemetry")
    +            })?;
    +        let collections_telemetry = join_result??;

    This assumes StoppingGuard signals cancellation on Drop and that deeper code periodically checks is_stopped(); if its semantics differ, the pattern may need to be adjusted.

  2. Triple ??? clarity

    The current ??? chain is type‑correct (outer timeout error, then join error, then collection error), but it is non‑obvious at a glance. Splitting it into join_result and collections_telemetry as above keeps behavior identical while making the error flow much easier to follow.


🏁 Script executed:

#!/bin/bash
# Verify how StoppingGuard is meant to be used and whether it relies on Drop to signal cancellation.
rg -n "struct StoppingGuard" -S
rg -n "impl Drop for StoppingGuard" -S
rg -n "is_stopped\(\)" -S

Length of output: 2769



🏁 Script executed:

#!/bin/bash
# Examine StoppingGuard struct definition and implementation details
cat lib/shard/src/common/stopping_guard.rs

Length of output: 846


prepare_data timeout/cancellation wiring: guard not observable from outside + triple-? readability

Two things to consider here:

  1. StoppingGuard lifetime / effectiveness

    StoppingGuard::new() is created inside the spawn_blocking closure and moved into it. Because StoppingGuard uses Arc<AtomicBool> and derives Clone, all copies share the same flag. However, the guard created inside the blocking closure is owned only by the blocking task—when the outer tokio::time::timeout returns due to timeout or error, the guard is not dropped and therefore cannot signal cancellation to the blocking code via its Drop implementation.

    If the intent is to propagate the outer timeout/cancellation into blocking code via StoppingGuard, move the guard creation outside and clone it into the blocking task. This way, dropping the outer guard on early return (due to timeout or error) will set is_stopped to true, and all inner clones will immediately see this change.

    For example:

    -    pub async fn prepare_data(
    -        &self,
    -        access: &Access,
    -        detail: TelemetryDetail,
    -        timeout: Option<Duration>,
    -    ) -> StorageResult<TelemetryData> {
    -        let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT);
    -        // Use blocking pool because the collection telemetry acquires several sync. locks.
    -        let collections_telemetry_handle = {
    +    pub async fn prepare_data(
    +        &self,
    +        access: &Access,
    +        detail: TelemetryDetail,
    +        timeout: Option<Duration>,
    +    ) -> StorageResult<TelemetryData> {
    +        let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT);
    +        let is_stopped_guard = StoppingGuard::new();
    +        // Use blocking pool because the collection telemetry acquires several sync. locks.
    +        let collections_telemetry_handle = {
                let toc = self
                    .dispatcher
                    .toc(access, &new_unchecked_verification_pass())
                    .clone();
                let runtime_handle = toc.general_runtime_handle().clone();
                let access_collection = access.clone();
    -            let is_stopped_guard = StoppingGuard::new();
    +            let is_stopped_guard = is_stopped_guard.clone();
    @@
    -            AbortOnDropHandle::new(handle)
    -        };
    -
    -        let collections_telemetry = tokio::time::timeout(timeout, collections_telemetry_handle)
    -            .await
    -            .map_err(|_: Elapsed| {
    -                StorageError::timeout(timeout.as_secs() as usize, "collections telemetry")
    -            })???;
    +            AbortOnDropHandle::new(handle)
    +        };
    +
    +        let join_result = tokio::time::timeout(timeout, collections_telemetry_handle)
    +            .await
    +            .map_err(|_: Elapsed| {
    +                StorageError::timeout(timeout.as_secs() as usize, "collections telemetry")
    +            })?;
    +        let collections_telemetry = join_result??;
  2. Triple ??? clarity

    The current ??? chain is type-correct (outer timeout error, then join error, then collection error), but is non-obvious at a glance. Splitting as shown above keeps behavior identical while making the error flow much clearer.

🤖 Prompt for AI Agents
In src/common/telemetry.rs around lines 80-118, move the StoppingGuard::new()
creation out of the spawn_blocking closure and clone it into the blocking task
so the outer guard is dropped on timeout/cancellation and can signal the inner
task via its Arc flag; then replace the single chained ??? at the timeout/await
with explicit, sequential error handling: first await tokio::time::timeout(...)
and map the Elapsed into a StorageError timeout, then await the spawned task /
JoinHandle and map any join error into an appropriate StorageError, and finally
propagate the collection error — this preserves behavior but makes the
cancellation observable from outside and the error flow clear.

@agourlay agourlay added this to the Better read timeouts milestone Nov 24, 2025
@agourlay agourlay force-pushed the timeout-cancels-metrics-telemetry branch from f5fb11b to d5af900 Compare November 24, 2025 17:09
@agourlay agourlay requested a review from timvisee November 24, 2025 17:12
coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

Copy link
Member

@timvisee timvisee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I like that this handles both cancellation flavors - aborting the request and a timeout.

I think we should adjust how we bridge cancellation between sync/async code, but I'd be happy with settling on that in a separate PR.

&self,
detail: TelemetryDetail,
timeout: Duration,
is_stopped_guard: &StoppingGuard,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's correct to use a stopped guard here.

Since this is an async function, I'd argue the right way of aborting it is to drop it. I only expect to see an actual stop guard/structure where we switch to sync code. We have some utilities in lib/common/cancel to help with this, specifically to bridge async/sync cancellation.

I'm fine with adjusting this behavior in a separate PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are correct 👍

I will fix this in a follow up PR to not hold the release.

@qdrant qdrant deleted a comment from coderabbitai bot Nov 25, 2025
@agourlay agourlay force-pushed the timeout-cancels-metrics-telemetry branch from 71d0572 to 7be293b Compare November 25, 2025 11:21
coderabbitai[bot]

This comment was marked as resolved.

@qdrant qdrant deleted a comment from coderabbitai bot Nov 25, 2025
@timvisee timvisee merged commit 8292971 into dev Nov 25, 2025
16 checks passed
@timvisee timvisee deleted the timeout-cancels-metrics-telemetry branch November 25, 2025 11:56
timvisee added a commit that referenced this pull request Nov 25, 2025
* Add timeout to cancel metrics and telemetry calls

* relax constraints

* Configure timeout default in OpenAPI schema

---------

Co-authored-by: timvisee <tim@visee.me>
@timvisee timvisee mentioned this pull request Nov 25, 2025
};
use crate::settings::Settings;

const DEFAULT_TELEMETRY_TIMEOUT: Duration = Duration::from_secs(60);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm late to the party, but here's my nit:

Add cross-reference comments so we won't forget update both places if we change this constant.

E.g.:

// Keep in sync with openapi/openapi-service.ytt.yaml
const DEFAULT_TELEMETRY_TIMEOUT: Duration = …;

openapi/openapi-service.ytt.yaml:

  /metrics:
    get:
      
      parameters:
        
        - name: timeout
          in: query
          description: "Timeout for this request"
          required: false
          schema:
            type: integer
            minimum: 1
            default: 60 # Keep in sync with DEFAULT_TELEMETRY_TIMEOUT

(worded in a way that rg DEFAULT_TELEMETRY_TIMEOUT yields both places)

Perhaps the same for the minimum: 1/#[validate(range(min = 1))].

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants