Skip to content

Timeout for snapshot stream#8166

Merged
generall merged 3 commits intodevfrom
stream-download-timeout
Feb 18, 2026
Merged

Timeout for snapshot stream#8166
generall merged 3 commits intodevfrom
stream-download-timeout

Conversation

@generall
Copy link
Member

[Ai]

In the `download_and_unpack_tar` function we download stream of bytes and unpack a tar archive, but it might happen that stream is stuck and downloading does not progress. Is it possible to              
  implement timeout for stream in a way, that if we don't read any bytes from stream in 60 seconds, we timeout the download function

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@lib/storage/src/content_manager/snapshots/download_tar.rs`:
- Line 56: Remove the debugging eprintln! call that prints buffer lengths (the
line containing eprintln!("buf.filled().len() = {:#?}", buf.filled().len());)
from download_tar.rs to avoid spamming stderr in production; if you still need
visibility for development, replace it with a conditional tracing/logging call
(e.g., use the crate's logger at trace/debug level) tied to the existing logging
framework so it can be enabled only during debugging.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
lib/storage/src/content_manager/snapshots/download_tar.rs (1)

225-260: Consider adding a test for the timeout behavior.

The existing test validates the happy path well. A test with a stalled/slow server (e.g., a mockito endpoint that delays its response body) would confirm the TimedOut error surfaces correctly. This can be deferred if you'd rather not add the complexity now.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/storage/src/content_manager/snapshots/download_tar.rs` around lines 225 -
260, Add a new async test in the tests module (alongside
test_download_and_unpack_tar) that verifies timeout behavior by spinning up a
mockito::Server::new_async() endpoint which intentionally stalls/delays its
response body, creating a reqwest::Client configured with a very short timeout,
calling download_and_unpack_tar with that client and the delayed URL, and
asserting the call returns an Err corresponding to the TimedOut error variant
(or the crate's equivalent) so the timeout path is exercised.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@lib/storage/src/content_manager/snapshots/download_tar.rs`:
- Around line 225-260: Add a new async test in the tests module (alongside
test_download_and_unpack_tar) that verifies timeout behavior by spinning up a
mockito::Server::new_async() endpoint which intentionally stalls/delays its
response body, creating a reqwest::Client configured with a very short timeout,
calling download_and_unpack_tar with that client and the delayed URL, and
asserting the call returns an Err corresponding to the TimedOut error variant
(or the crate's equivalent) so the timeout path is exercised.

@generall generall marked this pull request as draft February 17, 2026 20:28
@generall generall marked this pull request as ready for review February 18, 2026 00:47
Comment on lines -28 to +93
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"download cancelled",
));
return Err(std::io::Error::other("download cancelled"));
Copy link
Member Author

@generall generall Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is the most important in the PR.

Turns out, the Interrupted is a very poor choice, as it is interpreted as transient error and encouraged to be retried. From docs:

This operation was interrupted.
Interrupted operations can typically be retried.

Which is the opposite of cancelled. This caused Tar Unpack process to stuck in infinite loop, even after cancellation token was properly triggered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it is interpreted as transient error and encouraged to be retried.

😆

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
lib/storage/src/content_manager/snapshots/download_tar.rs (1)

264-320: No test covers the TimeoutReader inactivity timeout itself.

test_async_cancellation exercises the cancellation path; the TimeoutReader participates only as a passthrough (60 s never elapses). The core new behavior — a stalled stream producing ErrorKind::TimedOut — is untested. A targeted test using tokio::time::pause() / advance() would cover this without real delays, e.g. a stream that yields one byte then hangs indefinitely, advanced past 60 s.

Would you like me to draft a test_timeout_reader_stall test using tokio::time::pause() and advance()? Happy to open a new issue to track this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/storage/src/content_manager/snapshots/download_tar.rs` around lines 264 -
320, Add a focused test that exercises TimeoutReader's inactivity timeout:
create an async stream that yields one byte and then never yields again (e.g.,
futures::stream::iter once followed by futures::stream::pending()), call
tokio::time::pause() at test start, wrap the stream with StreamReader and
TimeoutReader (using STREAM_READ_TIMEOUT), attempt a read (or use
tokio::task::spawn_blocking with SyncIoBridge/CancellableReader if needed), then
advance the paused time past STREAM_READ_TIMEOUT with tokio::time::advance() and
assert the resulting error is std::io::ErrorKind::TimedOut; name the test
test_timeout_reader_stall and use the same helper types (TimeoutReader,
StreamReader, STREAM_READ_TIMEOUT) so it targets the inactivity path directly
without real delays.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@lib/storage/src/content_manager/snapshots/download_tar.rs`:
- Around line 301-310: The two tokio::select! arms have inverted semantics: the
sleep(Duration::from_millis(50)) branch is the expected success path that
confirms cancellation by dropping the handler, while the `_ = handler` branch is
the unexpected failure path where the task completed too soon. Update the
comments accordingly and make the `_ = handler` arm fail the test (e.g., panic!
with a clear message or assert) so a prematurely finishing `handler` surface as
an error; keep the sleep arm as the success case (you can keep the existing
eprintln! or replace it with a success log).

---

Nitpick comments:
In `@lib/storage/src/content_manager/snapshots/download_tar.rs`:
- Around line 264-320: Add a focused test that exercises TimeoutReader's
inactivity timeout: create an async stream that yields one byte and then never
yields again (e.g., futures::stream::iter once followed by
futures::stream::pending()), call tokio::time::pause() at test start, wrap the
stream with StreamReader and TimeoutReader (using STREAM_READ_TIMEOUT), attempt
a read (or use tokio::task::spawn_blocking with SyncIoBridge/CancellableReader
if needed), then advance the paused time past STREAM_READ_TIMEOUT with
tokio::time::advance() and assert the resulting error is
std::io::ErrorKind::TimedOut; name the test test_timeout_reader_stall and use
the same helper types (TimeoutReader, StreamReader, STREAM_READ_TIMEOUT) so it
targets the inactivity path directly without real delays.

Comment on lines +301 to +310
tokio::select! {
_ = handler => {
// Task finished on its own, which is unexpected in this test

}
_ = tokio::time::sleep(Duration::from_millis(50)) => {
// Timeout waiting for task to finish, which means cancellation likely failed
eprintln!("Cancelled");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

The select! arm comments are inverted — the timeout arm is the expected success path.

The 50 ms timeout branch is the one that actually triggers cancellation (by dropping the handler future), but its comment reads "which means cancellation likely failed", which is the opposite of the truth. The _ = handler arm (task finishes on its own in < 50 ms) is the unexpected path, yet it has no panic! or failing assertion.

🔧 Suggested correction
     tokio::select! {
         _ = handler => {
-            // Task finished on its own, which is unexpected in this test
-
+            // Task finished on its own before the timeout — cancellation was not exercised.
+            // The assertion below still passes, but this branch indicates the test may be invalid.
+            panic!("Handler completed without cancellation being triggered");
         }
          _ = tokio::time::sleep(Duration::from_millis(50)) => {
-            // Timeout waiting for task to finish, which means cancellation likely failed
-            eprintln!("Cancelled");
+            // Expected path: timeout fires, handler is dropped, cancellation is triggered.
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/storage/src/content_manager/snapshots/download_tar.rs` around lines 301 -
310, The two tokio::select! arms have inverted semantics: the
sleep(Duration::from_millis(50)) branch is the expected success path that
confirms cancellation by dropping the handler, while the `_ = handler` branch is
the unexpected failure path where the task completed too soon. Update the
comments accordingly and make the `_ = handler` arm fail the test (e.g., panic!
with a clear message or assert) so a prematurely finishing `handler` surface as
an error; keep the sleep arm as the success case (you can keep the existing
eprintln! or replace it with a success log).

@qdrant qdrant deleted a comment from coderabbitai bot Feb 18, 2026
Copy link
Member

@timvisee timvisee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the test as well.

Can we implement the same on the sender side?

@generall generall merged commit c7b6398 into dev Feb 18, 2026
17 checks passed
@generall generall deleted the stream-download-timeout branch February 18, 2026 10:04
timvisee pushed a commit that referenced this pull request Feb 18, 2026
* [AI] Imeplement timeout for reading snapshot stream

* remove debug code

* [no-AI] fix error type and cancellation test(not the one which actually found the problem)
@timvisee timvisee mentioned this pull request Feb 18, 2026
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants