Add timeout to cancel metrics and telemetry calls#7579
Conversation
46f3803 to
5243bdc
Compare
36a781a to
f5fb11b
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (10)
lib/storage/src/content_manager/toc/telemetry.rs (1)
65-82: Consider checkingis_stopped_guardbefore snapshot telemetry collection.While snapshot telemetry collection is likely fast (reading atomic counters), checking
is_stopped_guardbefore line 65 would make cancellation behavior more consistent throughout the function.Apply this diff to add the check:
+ if is_stopped_guard.is_stopped() { + return Ok(TocTelemetryData { + collection_telemetry, + snapshot_telemetry: Vec::new(), + }); + } + let snapshot_telemetry: Vec<_> = selfopenapi/openapi-service.ytt.yaml (2)
42-49: Clarify timeout units in the parameter description.The timeout parameter description should specify that the value is in seconds to avoid ambiguity for API consumers.
Consider updating the description to:
- description: "Timeout for this request" + description: "Timeout for this request in seconds"
66-73: Clarify timeout units in the parameter description.The timeout parameter description should specify that the value is in seconds to avoid ambiguity for API consumers.
Consider updating the description to:
- description: "Timeout for this request" + description: "Timeout for this request in seconds"docs/redoc/master/openapi.json (2)
247-257: Clarify units/range and add example for timeout (consistency with other endpoints).Current description is vague. Recommend explicitly stating seconds, allowed range, and default behavior; add an example for better SDK docs.
{ "name": "timeout", "in": "query", - "description": "Timeout for this request", + "description": "If set, overrides the internal timeout for this request. Unit is seconds. Allowed range: 1–10. If omitted, the server default is used.", "required": false, "schema": { "type": "integer", "minimum": 1, - "maximum": 10 - } + "maximum": 10 + }, + "example": 5 }
337-347: Mirror telemetry timeout wording for metrics; add example.Keep the same phrasing and example to avoid ambiguity and ease client generation.
{ "name": "timeout", "in": "query", - "description": "Timeout for this request", + "description": "If set, overrides the internal timeout for this request. Unit is seconds. Allowed range: 1–10. If omitted, the server default is used.", "required": false, "schema": { "type": "integer", "minimum": 1, - "maximum": 10 - } + "maximum": 10 + }, + "example": 5 }src/common/telemetry_reporting.rs (1)
7-7: Telemetry reporter error handling and result type look consistent
TelemetryReporter::reportnow usesStorageResult<()>, propagates failures fromprepare_data, JSON serialization, and HTTP I/O, and logs non‑success HTTP statuses while still returningOk(()). This matches a best‑effort, background reporting model and aligns with the new fallible telemetry plumbing. PassingNoneas timeout toprepare_datais also reasonable if this path intentionally avoids the API timeout constraints.If you want producer telemetry to respect the same internal timeout as the public telemetry endpoint, consider threading a concrete
Durationhere instead ofNoneso this background task can’t compete indefinitely with foreground workloads.Also applies to: 40-63
lib/collection/src/collection/telemetry.rs (1)
1-2: Collection telemetry now correctly participates in timeout/cancellationThe updated
Collection::get_telemetry_datasignature and body cleanly propagatetimeoutandStoppingGuarddown to shards and returnCollectionResult<CollectionTelemetry>, letting shard‑level timeouts bubble up while preserving the existing telemetry shape. This is consistent with the new timeout‑aware shard APIs.If you ever see this become a bottleneck with many shards, consider parallelizing the per‑shard
get_telemetry_datacalls under the read lock or snapshotting the shard list first and then gathering telemetry concurrently, while still honoring the same timeout and stopping guard semantics.Also applies to: 4-4, 7-7, 11-16, 22-26, 44-52
lib/collection/src/shards/shard.rs (1)
4-4: Shard::get_telemetry_data correctly unifies timeout/guard-aware telemetry retrievalThe Shard enum’s
get_telemetry_datanow consistently:
- Accepts
timeout: Durationand&StoppingGuard.- Delegates to each concrete shard’s timeout/guard‑aware telemetry method, propagating
CollectionResulterrors.- Preserves the existing
statusenrichment for local shards andvariant_nametagging for all variants.This is a clean centralization of the new telemetry behavior with no obvious edge‑case regressions.
If
local_shard_status()ever becomes a source of blocking or contention, you may later want to make it timeout/guard‑aware as well (or at least guard it with the sameStoppingGuard), but it’s reasonable to keep it straightforward for now.Also applies to: 12-12, 75-111
lib/collection/src/shards/replica_set/telemetry.rs (1)
13-51: Replica set telemetry timeout/guard propagation looks consistent; consider remotesThe replica-set
get_telemetry_datacorrectly threadstimeoutandStoppingGuardinto the local shard and wraps everything inCollectionResult, so local timeouts/cancellations can bubble up cleanly. The remote side still callsremote.get_telemetry_data(detail)without timeout/guard; if remote telemetry can ever block on IO or long‑held locks, you may want to extend that API similarly so a timed‑out or cancelled overall telemetry request doesn’t keep waiting on remotes.src/actix/api/service_api.rs (1)
85-127: Metrics timeout/error handling looks correct; consider improving timing measurementThe metrics handler now treats
prepare_dataas fallible and routes failures throughprocess_response_error, while only anonymizing and formatting metrics on the success path, which aligns with the new timeout behavior. One small improvement: to get a more accurate elapsed time in error logs, you might want to capturelet timing = Instant::now();once near the top of the handler and reuse it in both the access check failure and theErr(err)branch instead of callingInstant::now()at the error site.#[get("/metrics")] async fn metrics( - telemetry_collector: Data<Mutex<TelemetryCollector>>, + telemetry_collector: Data<Mutex<TelemetryCollector>>, params: Query<MetricsParam>, config: Data<ServiceConfig>, ActixAccess(access): ActixAccess, ) -> HttpResponse { - if let Err(err) = access.check_global_access(AccessRequirements::new()) { - return process_response_error(err, Instant::now(), None); - } + let timing = Instant::now(); + if let Err(err) = access.check_global_access(AccessRequirements::new()) { + return process_response_error(err, timing, None); + } @@ - match telemetry_data { - Err(err) => process_response_error(err, Instant::now(), None), + match telemetry_data { + Err(err) => process_response_error(err, timing, None), Ok(telemetry_data) => { let telemetry_data = if anonymize { telemetry_data.anonymize()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
docs/redoc/master/openapi.json(2 hunks)lib/collection/src/collection/telemetry.rs(2 hunks)lib/collection/src/shards/forward_proxy_shard.rs(2 hunks)lib/collection/src/shards/local_shard/telemetry.rs(3 hunks)lib/collection/src/shards/proxy_shard.rs(2 hunks)lib/collection/src/shards/queue_proxy_shard.rs(2 hunks)lib/collection/src/shards/replica_set/telemetry.rs(2 hunks)lib/collection/src/shards/shard.rs(2 hunks)lib/collection/src/shards/telemetry.rs(1 hunks)lib/shard/src/common/stopping_guard.rs(2 hunks)lib/storage/src/content_manager/errors.rs(1 hunks)lib/storage/src/content_manager/toc/telemetry.rs(3 hunks)openapi/openapi-service.ytt.yaml(2 hunks)src/actix/api/service_api.rs(5 hunks)src/actix/helpers.rs(3 hunks)src/common/telemetry.rs(4 hunks)src/common/telemetry_ops/collections_telemetry.rs(4 hunks)src/common/telemetry_reporting.rs(2 hunks)
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-10-13T22:58:03.121Z
Learnt from: generall
Repo: qdrant/qdrant PR: 7400
File: lib/segment/src/id_tracker/simple_id_tracker.rs:234-241
Timestamp: 2025-10-13T22:58:03.121Z
Learning: SimpleIdTracker in lib/segment/src/id_tracker/simple_id_tracker.rs is being deprecated and should not receive fixes related to version tracking or recovery logic, as it has a different version storage structure that is incompatible with newer trackers.
Applied to files:
lib/collection/src/shards/shard.rs
📚 Learning: 2025-07-11T11:35:21.549Z
Learnt from: generall
Repo: qdrant/qdrant PR: 6854
File: lib/segment/src/index/query_estimator.rs:320-327
Timestamp: 2025-07-11T11:35:21.549Z
Learning: In test code for Qdrant's query estimator (lib/segment/src/index/query_estimator.rs), simplified ID resolution logic using `id.to_string().parse().unwrap()` is acceptable for testing purposes and doesn't need to match production code's `id_tracker.internal_id()` approach. Test code can use mock implementations that serve the testing goals.
Applied to files:
lib/collection/src/shards/shard.rs
📚 Learning: 2025-09-01T11:42:06.964Z
Learnt from: timvisee
Repo: qdrant/qdrant PR: 7157
File: lib/shard/src/segment_holder/mod.rs:808-814
Timestamp: 2025-09-01T11:42:06.964Z
Learning: In Qdrant's segment holder, panicking when no segments exist during flush_all is intentional and preferred over graceful error handling, as having zero segments could permanently corrupt the WAL by acknowledging u64::MAX. The maintainers consider this condition impossible and use the panic as a fail-fast safety mechanism to prevent data corruption.
Applied to files:
lib/collection/src/shards/shard.rs
📚 Learning: 2025-08-10T18:30:02.986Z
Learnt from: generall
Repo: qdrant/qdrant PR: 7006
File: lib/collection/src/operations/verification/update.rs:158-174
Timestamp: 2025-08-10T18:30:02.986Z
Learning: In Qdrant's strict mode verification code (lib/collection/src/operations/verification/update.rs), exhaustive pattern matching without `..` is intentionally used for structs like PointsBatch and PointsList. This design pattern ensures compilation fails when new fields are added, forcing developers to explicitly consider how new fields should be handled in the indexed_filter_write method. This provides visibility and compile-time safety for struct evolution.
Applied to files:
lib/collection/src/shards/proxy_shard.rslib/collection/src/shards/forward_proxy_shard.rssrc/common/telemetry.rs
🧬 Code graph analysis (10)
lib/collection/src/shards/queue_proxy_shard.rs (3)
lib/collection/src/shards/forward_proxy_shard.rs (1)
get_telemetry_data(360-369)lib/collection/src/shards/proxy_shard.rs (1)
get_telemetry_data(148-157)lib/collection/src/shards/shard.rs (1)
get_telemetry_data(75-111)
lib/collection/src/shards/shard.rs (8)
lib/collection/src/collection/telemetry.rs (1)
get_telemetry_data(11-53)lib/collection/src/shards/forward_proxy_shard.rs (1)
get_telemetry_data(360-369)lib/collection/src/shards/proxy_shard.rs (1)
get_telemetry_data(148-157)lib/collection/src/shards/queue_proxy_shard.rs (1)
get_telemetry_data(190-200)lib/storage/src/content_manager/toc/telemetry.rs (1)
get_telemetry_data(43-88)lib/collection/src/shards/dummy_shard.rs (1)
get_telemetry_data(63-78)lib/segment/src/segment/entry.rs (1)
get_telemetry_data(881-898)lib/collection/src/shards/remote_shard.rs (1)
get_telemetry_data(202-215)
lib/collection/src/shards/proxy_shard.rs (7)
lib/collection/src/shards/forward_proxy_shard.rs (1)
get_telemetry_data(360-369)lib/collection/src/shards/queue_proxy_shard.rs (1)
get_telemetry_data(190-200)lib/collection/src/shards/replica_set/telemetry.rs (1)
get_telemetry_data(13-51)lib/collection/src/shards/shard.rs (1)
get_telemetry_data(75-111)lib/segment/src/index/hnsw_index/hnsw.rs (1)
get_telemetry_data(1458-1472)lib/segment/src/segment/entry.rs (1)
get_telemetry_data(881-898)lib/segment/src/index/vector_index_base.rs (2)
get_telemetry_data(36-36)get_telemetry_data(241-261)
lib/collection/src/shards/forward_proxy_shard.rs (4)
lib/collection/src/shards/local_shard/telemetry.rs (1)
get_telemetry_data(21-118)lib/collection/src/shards/proxy_shard.rs (1)
get_telemetry_data(148-157)lib/collection/src/shards/queue_proxy_shard.rs (1)
get_telemetry_data(190-200)lib/collection/src/shards/shard.rs (1)
get_telemetry_data(75-111)
lib/storage/src/content_manager/toc/telemetry.rs (1)
lib/shard/src/common/stopping_guard.rs (1)
new(12-16)
lib/collection/src/shards/replica_set/telemetry.rs (4)
lib/collection/src/shards/forward_proxy_shard.rs (1)
get_telemetry_data(360-369)lib/collection/src/shards/proxy_shard.rs (1)
get_telemetry_data(148-157)lib/collection/src/shards/queue_proxy_shard.rs (1)
get_telemetry_data(190-200)lib/collection/src/shards/shard.rs (1)
get_telemetry_data(75-111)
src/common/telemetry.rs (3)
lib/collection/src/operations/verification/mod.rs (1)
new_unchecked_verification_pass(26-28)lib/storage/src/content_manager/errors.rs (1)
timeout(103-110)lib/shard/src/common/stopping_guard.rs (1)
new(12-16)
src/actix/api/service_api.rs (3)
src/actix/helpers.rs (2)
time(141-147)process_response_error(119-125)src/actix/api/collections_api.rs (1)
timeout(29-31)src/actix/api/read_params.rs (1)
timeout(19-21)
lib/storage/src/content_manager/errors.rs (1)
lib/collection/src/operations/types.rs (5)
service_error(944-949)bad_request(967-971)bad_input(951-955)not_found(957-959)timeout(935-942)
lib/collection/src/shards/local_shard/telemetry.rs (4)
lib/segment/src/vector_storage/common.rs (1)
get_async_scorer(12-14)lib/collection/src/shards/shard.rs (1)
get_telemetry_data(75-111)lib/storage/src/content_manager/errors.rs (1)
timeout(103-110)lib/common/common/src/types.rs (1)
default(68-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: e2e-tests
- GitHub Check: test-shard-snapshot-api-s3-minio
- GitHub Check: integration-tests-consensus
- GitHub Check: integration-tests
- GitHub Check: test-consistency
- GitHub Check: rust-tests (windows-latest)
- GitHub Check: test-consensus-compose
- GitHub Check: rust-tests-no-rocksdb (ubuntu-latest)
- GitHub Check: rust-tests (macos-latest)
- GitHub Check: rust-tests (ubuntu-latest)
- GitHub Check: lint
- GitHub Check: Build Qdrant Edge Python bindings
🔇 Additional comments (19)
lib/storage/src/content_manager/toc/telemetry.rs (4)
3-3: LGTM! Imports align with timeout and cancellation support.The new imports for
Duration,CollectionResult, andStoppingGuardare appropriate for the timeout and cancellation functionality being added.Also applies to: 5-5, 12-12
43-49: LGTM! Method signature correctly supports timeout and cancellation.The updated signature properly accepts
timeoutandis_stopped_guardparameters and returnsCollectionResultto allow timeout errors to propagate.
51-62: LGTM! Proper cancellation and timeout propagation.The early break when
is_stopped_guard.is_stopped()allows graceful cancellation with partial results, and the timeout and stopping guard are correctly propagated tocollection.get_telemetry_data().
90-106: The suggested fix is incompatible with the method signature.The collection method
get_aggregated_telemetry_data()atlib/collection/src/collection/mod.rs:786only accepts&selfand returnsCollectionsAggregatedTelemetry. It does not supporttimeoutoris_stopped_guardparameters, unlikeget_telemetry_data()which does accept these parameters (atlib/collection/src/collection/telemetry.rs:11).The design intentionally differs between the two methods. The review comment's suggested diff to pass
timeoutandis_stopped_guardtoget_aggregated_telemetry_data()would fail compilation because the method signature does not support these parameters.Likely an incorrect or invalid review comment.
lib/shard/src/common/stopping_guard.rs (2)
5-5: LGTM!Adding
ClonetoStoppingGuardis appropriate since it wraps anArc<AtomicBool>, enabling the guard to be shared across multiple telemetry collection tasks.
22-24: LGTM!The
is_stopped()method usingRelaxedordering is appropriate for this cancellation flag use case, consistent with the existingDropimplementation.lib/collection/src/shards/queue_proxy_shard.rs (1)
190-200: LGTM!The updated signature correctly aligns with the timeout and cancellation patterns implemented across other shard types (ForwardProxyShard, ProxyShard, LocalShard). The method properly forwards the timeout and stopping guard to the wrapped shard and returns a
CollectionResultfor error propagation.lib/collection/src/shards/telemetry.rs (1)
38-38: LGTM!Adding
DefaulttoLocalShardTelemetryis safe and useful. All fields are eitherOptiontypes or haveDefaultimplementations (e.g.,OptimizerTelemetryon line 77), making this a sensible addition for initializing telemetry structs.src/actix/helpers.rs (1)
12-12: LGTM!Switching to the
StorageResult<T>type alias improves consistency with the storage crate conventions. This is semantically equivalent to the previousResult<T, StorageError>signature and causes no functional change.Also applies to: 143-143, 153-153
src/common/telemetry_reporting.rs (1)
69-71: Logging on report failures is appropriateWrapping
reportinif let Err(err)and logging the error keeps the hourly loop resilient while surfacing genuine failures (serialization/HTTP errors) without crashing the background task.lib/collection/src/shards/proxy_shard.rs (1)
18-18: ProxyShard telemetry forwarding matches the new shard telemetry contract
ProxyShard::get_telemetry_datacorrectly mirrors the other proxy variants by accepting(detail, timeout, &StoppingGuard)and forwarding all three to the wrappedLocalShard, returningCollectionResult<LocalShardTelemetry>. This keeps the shard enum implementation simple and consistent.Also applies to: 148-157
lib/collection/src/shards/forward_proxy_shard.rs (1)
17-17: ForwardProxyShard telemetry timeout/guard wiring is correctThe new
get_telemetry_datamethod cleanly forwardsdetail,timeout, andis_stopped_guardto the wrappedLocalShard, returningCollectionResult<LocalShardTelemetry>. This keeps the forward proxy in line with other shard variants and integrates it into the timeout/cancellation pipeline.Also applies to: 360-369
src/common/telemetry_ops/collections_telemetry.rs (1)
36-80: Collections telemetry orchestration with timeout/guard looks consistentThe revised
CollectionsTelemetry::collectcleanly propagates the newtimeoutandStoppingGuarddown intoTableOfContent::get_telemetry_dataandget_aggregated_telemetry_data, and now returnsCollectionResult<Self>. This keeps the level‑based behavior intact (full vs aggregated telemetry) while letting shard/collection timeouts and cancellations bubble up through a singleCollectionResultpath.lib/storage/src/content_manager/errors.rs (1)
53-111: StorageError constructors and timeout helper are consistentReturning
Selffrom the helper constructors and addingStorageError::timeout(timeout_sec, operation)aligns this module with the existingCollectionErrorhelpers and simplifies error construction. The newtimeoutvariant is a good fit for surfacing telemetry/metrics timeouts with a clear, formatted description.lib/collection/src/shards/local_shard/telemetry.rs (1)
21-70: Remove incorrect review concerns; implementation handles error conversion and type conversion appropriatelyThe original review contains two significant errors:
JoinError conversion exists: The codebase already has
impl From<JoinError> for CollectionErrorat lib/collection/src/operations/types.rs:1179, soAbortOnDropHandle::new(handle).await?will compile and work correctly. The flagged compilation risk does not exist.HashMap wrapping is necessary, not redundant: SizeStats::num_vectors_by_name is defined as
TinyMap<VectorNameBuf, usize>, notHashMap. The code at lines 94–117 correctly converts fromTinyMaptoHashMapwithHashMap::from(num_vectors_by_name)before storing inLocalShardTelemetry. This is a required type conversion, not unnecessary wrapping.The implementation's timeout handling,
spawn_blockingusage, andStoppingGuardintegration remain sound. No changes are needed.src/actix/api/service_api.rs (4)
3-18: Timeout and validation imports are coherent and used correctlyThe added imports for
Duration,Data,Query, andValidateline up with the new timeout helpers and validated query extractors; everything appears used and scoped appropriately.
29-41: TelemetryParam timeout field and helper look soundThe
timeout: Option<u64>with#[validate(range(min = 1, max = 10))]plus thetimeout(&self) -> Option<Duration>helper gives a clear 1–10s bound while keeping the conversion logic simple. This mirrors the existing pattern in other APIs and should behave well withactix_web_validator::Queryautomatically enforcing the range.
43-70: Telemetry handler correctly propagates timeout into telemetry collectionUsing
Data<Mutex<TelemetryCollector>>and callingprepare_data(&access, detail, params.timeout()).await?threads the optional timeout through without changing the anonymization logic. The error propagation via?insidehelpers::timematches the pattern used for other fallible endpoints in this file.
72-83: MetricsParam timeout configuration is consistent with TelemetryParam
MetricsParammirrorsTelemetryParamwith the same 1–10s validation andtimeout()helper, which keeps behavior consistent across/telemetryand/metricswhile keeping the mapping toDurationtrivial.
| pub async fn prepare_data( | ||
| &self, | ||
| access: &Access, | ||
| detail: TelemetryDetail, | ||
| timeout: Option<Duration>, | ||
| ) -> StorageResult<TelemetryData> { | ||
| let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT); | ||
| // Use blocking pool because the collection telemetry acquires several sync. locks. | ||
| let collections_telemetry = { | ||
| let collections_telemetry_handle = { | ||
| let toc = self | ||
| .dispatcher | ||
| .toc(access, &new_unchecked_verification_pass()) | ||
| .clone(); | ||
| let runtime_handle = toc.general_runtime_handle().clone(); | ||
| let access_collection = access.clone(); | ||
| let is_stopped_guard = StoppingGuard::new(); | ||
|
|
||
| let handle = runtime_handle.spawn_blocking(move || { | ||
| // Re-enter the async runtime in this blocking thread | ||
| tokio::runtime::Handle::current().block_on(async move { | ||
| CollectionsTelemetry::collect(detail, &access_collection, &toc).await | ||
| CollectionsTelemetry::collect( | ||
| detail, | ||
| &access_collection, | ||
| &toc, | ||
| timeout, | ||
| &is_stopped_guard, | ||
| ) | ||
| .await | ||
| }) | ||
| }); | ||
| AbortOnDropHandle::new(handle).await | ||
| AbortOnDropHandle::new(handle) | ||
| }; | ||
|
|
||
| let collections_telemetry = collections_telemetry | ||
| .map_err(|err| log::error!("Failed to generate collection telemetry {err}")) | ||
| .unwrap_or_default(); | ||
| let collections_telemetry = tokio::time::timeout(timeout, collections_telemetry_handle) | ||
| .await | ||
| .map_err(|_: Elapsed| { | ||
| StorageError::timeout(timeout.as_secs() as usize, "collections telemetry") | ||
| })???; | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
prepare_data timeout/cancellation wiring: guard not observable from outside + triple-? readability
Two things to consider here:
-
StoppingGuard lifetime / effectiveness
StoppingGuard::new()is created inside thespawn_blockingclosure and moved into it:let is_stopped_guard = StoppingGuard::new(); let handle = runtime_handle.spawn_blocking(move || { // ... CollectionsTelemetry::collect( detail, &access_collection, &toc, timeout, &is_stopped_guard, ) .await })
Because there is no copy of the guard kept in
prepare_data, dropping the outer future (or hitting thetokio::time::timeout) does not, by itself, drop a guard that the blocking task can observe. Unless something else outside this call holds and drops a clone,is_stopped_guard.is_stopped()checks in deeper layers will never flip totrue, and blocking loops will only stop when they naturally complete.If the intent is to propagate the outer timeout/cancellation into blocking code via
StoppingGuard, it would be safer to:- create the guard outside,
- clone it into the blocking task,
- and rely on the outer guard being dropped on early return (
?on timeout/error) to signal “stop” to all clones.
For example:
- pub async fn prepare_data( - &self, - access: &Access, - detail: TelemetryDetail, - timeout: Option<Duration>, - ) -> StorageResult<TelemetryData> { - let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT); - // Use blocking pool because the collection telemetry acquires several sync. locks. - let collections_telemetry_handle = { + pub async fn prepare_data( + &self, + access: &Access, + detail: TelemetryDetail, + timeout: Option<Duration>, + ) -> StorageResult<TelemetryData> { + let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT); + let is_stopped_guard = StoppingGuard::new(); + // Use blocking pool because the collection telemetry acquires several sync. locks. + let collections_telemetry_handle = { let toc = self .dispatcher .toc(access, &new_unchecked_verification_pass()) .clone(); let runtime_handle = toc.general_runtime_handle().clone(); let access_collection = access.clone(); - let is_stopped_guard = StoppingGuard::new(); + let is_stopped_guard = is_stopped_guard.clone(); @@ - AbortOnDropHandle::new(handle) - }; - - let collections_telemetry = tokio::time::timeout(timeout, collections_telemetry_handle) - .await - .map_err(|_: Elapsed| { - StorageError::timeout(timeout.as_secs() as usize, "collections telemetry") - })???; + AbortOnDropHandle::new(handle) + }; + + let join_result = tokio::time::timeout(timeout, collections_telemetry_handle) + .await + .map_err(|_: Elapsed| { + StorageError::timeout(timeout.as_secs() as usize, "collections telemetry") + })?; + let collections_telemetry = join_result??;
This assumes
StoppingGuardsignals cancellation onDropand that deeper code periodically checksis_stopped(); if its semantics differ, the pattern may need to be adjusted. -
Triple
???clarityThe current
???chain is type‑correct (outer timeout error, then join error, then collection error), but it is non‑obvious at a glance. Splitting it intojoin_resultandcollections_telemetryas above keeps behavior identical while making the error flow much easier to follow.
🏁 Script executed:
#!/bin/bash
# Verify how StoppingGuard is meant to be used and whether it relies on Drop to signal cancellation.
rg -n "struct StoppingGuard" -S
rg -n "impl Drop for StoppingGuard" -S
rg -n "is_stopped\(\)" -SLength of output: 2769
🏁 Script executed:
#!/bin/bash
# Examine StoppingGuard struct definition and implementation details
cat lib/shard/src/common/stopping_guard.rsLength of output: 846
prepare_data timeout/cancellation wiring: guard not observable from outside + triple-? readability
Two things to consider here:
-
StoppingGuard lifetime / effectiveness
StoppingGuard::new()is created inside thespawn_blockingclosure and moved into it. BecauseStoppingGuardusesArc<AtomicBool>and derivesClone, all copies share the same flag. However, the guard created inside the blocking closure is owned only by the blocking task—when the outertokio::time::timeoutreturns due to timeout or error, the guard is not dropped and therefore cannot signal cancellation to the blocking code via itsDropimplementation.If the intent is to propagate the outer timeout/cancellation into blocking code via
StoppingGuard, move the guard creation outside and clone it into the blocking task. This way, dropping the outer guard on early return (due to timeout or error) will setis_stoppedtotrue, and all inner clones will immediately see this change.For example:
- pub async fn prepare_data( - &self, - access: &Access, - detail: TelemetryDetail, - timeout: Option<Duration>, - ) -> StorageResult<TelemetryData> { - let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT); - // Use blocking pool because the collection telemetry acquires several sync. locks. - let collections_telemetry_handle = { + pub async fn prepare_data( + &self, + access: &Access, + detail: TelemetryDetail, + timeout: Option<Duration>, + ) -> StorageResult<TelemetryData> { + let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT); + let is_stopped_guard = StoppingGuard::new(); + // Use blocking pool because the collection telemetry acquires several sync. locks. + let collections_telemetry_handle = { let toc = self .dispatcher .toc(access, &new_unchecked_verification_pass()) .clone(); let runtime_handle = toc.general_runtime_handle().clone(); let access_collection = access.clone(); - let is_stopped_guard = StoppingGuard::new(); + let is_stopped_guard = is_stopped_guard.clone(); @@ - AbortOnDropHandle::new(handle) - }; - - let collections_telemetry = tokio::time::timeout(timeout, collections_telemetry_handle) - .await - .map_err(|_: Elapsed| { - StorageError::timeout(timeout.as_secs() as usize, "collections telemetry") - })???; + AbortOnDropHandle::new(handle) + }; + + let join_result = tokio::time::timeout(timeout, collections_telemetry_handle) + .await + .map_err(|_: Elapsed| { + StorageError::timeout(timeout.as_secs() as usize, "collections telemetry") + })?; + let collections_telemetry = join_result??;
-
Triple
???clarityThe current
???chain is type-correct (outer timeout error, then join error, then collection error), but is non-obvious at a glance. Splitting as shown above keeps behavior identical while making the error flow much clearer.
🤖 Prompt for AI Agents
In src/common/telemetry.rs around lines 80-118, move the StoppingGuard::new()
creation out of the spawn_blocking closure and clone it into the blocking task
so the outer guard is dropped on timeout/cancellation and can signal the inner
task via its Arc flag; then replace the single chained ??? at the timeout/await
with explicit, sequential error handling: first await tokio::time::timeout(...)
and map the Elapsed into a StorageError timeout, then await the spawned task /
JoinHandle and map any join error into an appropriate StorageError, and finally
propagate the collection error — this preserves behavior but makes the
cancellation observable from outside and the error flow clear.
f5fb11b to
d5af900
Compare
timvisee
left a comment
There was a problem hiding this comment.
Nice! I like that this handles both cancellation flavors - aborting the request and a timeout.
I think we should adjust how we bridge cancellation between sync/async code, but I'd be happy with settling on that in a separate PR.
| &self, | ||
| detail: TelemetryDetail, | ||
| timeout: Duration, | ||
| is_stopped_guard: &StoppingGuard, |
There was a problem hiding this comment.
I don't think it's correct to use a stopped guard here.
Since this is an async function, I'd argue the right way of aborting it is to drop it. I only expect to see an actual stop guard/structure where we switch to sync code. We have some utilities in lib/common/cancel to help with this, specifically to bridge async/sync cancellation.
I'm fine with adjusting this behavior in a separate PR.
There was a problem hiding this comment.
I think you are correct 👍
I will fix this in a follow up PR to not hold the release.
71d0572 to
7be293b
Compare
* Add timeout to cancel metrics and telemetry calls * relax constraints * Configure timeout default in OpenAPI schema --------- Co-authored-by: timvisee <tim@visee.me>
| }; | ||
| use crate::settings::Settings; | ||
|
|
||
| const DEFAULT_TELEMETRY_TIMEOUT: Duration = Duration::from_secs(60); |
There was a problem hiding this comment.
I'm late to the party, but here's my nit:
Add cross-reference comments so we won't forget update both places if we change this constant.
E.g.:
// Keep in sync with openapi/openapi-service.ytt.yaml
const DEFAULT_TELEMETRY_TIMEOUT: Duration = …;openapi/openapi-service.ytt.yaml:
/metrics:
get:
…
parameters:
…
- name: timeout
in: query
description: "Timeout for this request"
required: false
schema:
type: integer
minimum: 1
default: 60 # Keep in sync with DEFAULT_TELEMETRY_TIMEOUT(worded in a way that rg DEFAULT_TELEMETRY_TIMEOUT yields both places)
Perhaps the same for the minimum: 1/#[validate(range(min = 1))].
This PR adds a new timeout support for the telemetry endpoint and the metrics endpoint.
The goal is to prevent potentially long running computations to conflict with other workloads.
This new timeout is now always present internally with a default value of 60 seconds.
It can be changed to any positive value via the API to handle extreme cases.
The telemetry/metrics workload cancellation works at different level to juggle with the sync. operations.
tokio::timeouton the top level futureBefore this PR, the telemetry/metrics API were not fallible.
They would always return successfully at the cost of returning partial results.
Both APIs are now fallible to be able to bubble up timeouts internally.