Skip to content

Progress reporting for downloads.#645

Merged
hoytak merged 14 commits intomainfrom
hoytak/260202-progress-stream
Feb 19, 2026
Merged

Progress reporting for downloads.#645
hoytak merged 14 commits intomainfrom
hoytak/260202-progress-stream

Conversation

@hoytak
Copy link
Collaborator

@hoytak hoytak commented Feb 10, 2026

This PR adds detailed progress reporting to the download path.

  • Transfer progress is reported as soon as the download streams start; actual bytes written are reported as the reconstructed file is written out.
  • Currently, each call to download_file creates a separate progress tracker, but this sets up for download groups with grouped download progress tracking.

To support this, the UploadProgressStream was split into three classes; a common StreamProgressReporter and download and upload specific versions. This also allows us to simplify the API to RetryWrapper.

More tracking was added to the file reconstruction paths to properly report progress.

@rajatarya
Copy link
Collaborator

Review Notes

Reviewed the full diff (+1,824 / -661 across 42 files). The architecture is clean — separating transfer bytes (network) from decompressed bytes (disk) through independent call sites is the right approach, and extracting progress from RetryWrapper internals is a nice simplification.

Items worth discussing

  1. Relaxed atomics ordering — All counters use Relaxed. This is fine for monotonic counters, but report_bytes_written has debug_assert_le!(bytes_completed, item_total_bytes) where item_total_bytes is loaded with Relaxed from a different atomic that may be updated concurrently by update_item_size. Under weak memory ordering, the assert could theoretically see a stale item_total_bytes while seeing a current bytes_completed. Unlikely to fire in practice, but worth considering Acquire/Release on the total-size stores/loads if the asserts matter.

  2. Fire-and-forget tokio::spawn for progress updatesDownloadTaskUpdater sends all progress via tokio::spawn(async move { inner.register_updates(...).await }). If the runtime is shutting down or under pressure, these can be silently dropped. Fine for UI progress bars, but worth a comment documenting this is intentional.

  3. cfg!(debug_assertions) default tracker in FileReconstructor::new() — In debug builds, every FileReconstructor creates a correctness_verification_tracker even when no progress tracking is needed, and assert_complete() runs at the end. This means debug-mode downloads are strictly validated (good for catching bugs), but it's a behavioral difference from release that's easy to miss. Consider a comment explaining the intent.

  4. determine_size_if_possible returning None — When the DP can't chain chunk ranges, remote_client.rs falls back to Vec::with_capacity(0), meaning no pre-allocation for the decompressed buffer. Is this the common or rare case? If common, might be worth a follow-up to handle it.

  5. merge_in logic change in progress_info.rs — The merge now conditionally adds other.total_bytes_increment when other.total_bytes == 0, instead of always taking max. This subtly changes behavior when both sides have non-zero totals with increments — worth verifying this matches the intended semantics.

  6. Unrelated changes — git-lfs availability guards in install/uninstall tests, sort_bysort_by_key(Reverse(...)) in shard_file_manager.rs and logging.rs, and wasm formatting changes are mixed in. Minor, but could be split out.

What looks good

  • StreamProgressReporter with fetch_max high-water-mark for retry dedup is elegant
  • RetryWrapper simplification (Fn(Option<Arc<dyn Fn(f64, u64)>>)Fn()) reduces cognitive load across ~15 call sites
  • cas_client no longer depends on progress_tracking — good decoupling
  • Test coverage is solid: ~600 new test lines covering the DP algorithm (16 cases), DownloadTaskUpdater flows (14 cases), stream progress (4 cases), and end-to-end FileReconstructor progress (4 cases)
  • is_final semantics on update_item_size cleanly handle both the data_client (size-known-upfront) and reconstruction (incremental-discovery) paths

@rajatarya
Copy link
Collaborator

(I'm still working through Claude-assisted review of this PR - will update with more comments/questions as I get through it. But this summary raises some good suggestions around adding comments.)

@hoytak
Copy link
Collaborator Author

hoytak commented Feb 11, 2026

Good feedback; updated in response to a couple things there (the atomic ordering, the merge_in change, and the unrelated changes).

Copy link
Collaborator

@rajatarya rajatarya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love seeing these changes. Did as thorough a review as I could with all the file changes - and overall I LOVE these changes. Minor comments on adding docstrings and comments.

}
}

pub fn correctness_verification_tracker() -> Arc<Self> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a debug only checker, can this function be annotated that way?

pub xorb_hash: MerkleHash,
pub chunk_range: ChunkRange,
pub xorb_block_index: usize,
/// All file-term chunk ranges covered by this xorb block, sorted by range start.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a comment for the pub struct here.

progress_updater: None,
progress_updater: {
if cfg!(debug_assertions) {
Some(DownloadTaskUpdater::correctness_verification_tracker())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd love it if there was a comment explaining that this is a test hook for unit-tests and is compiled out.


#[cfg(debug_assertions)]
{
let refs = &file_term.xorb_block.references;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if there was a comment describing this assert. Not sure if folks down the road will remember why these debug assertions exist.

reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
}

// ==================== Progress tracker tests ====================
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love seeing all the new tests!

let self_ = Arc::new(self);
let try_count = AtomicUsize::new(0);

// Extract the partial progress reporting function from the connection permit if it exists
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love seeing this progress reporting getting pulled out of retry wrapping. Definitely cleaner abstractions this way!

match result {
Ok((_compressed_len, chunk_byte_indices)) => {
if let Some(expected) = uncompressed_size_if_known {
debug_assert_eq!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should log error!() or warn!() if the expected bytes are not returned.

This code still returns Ok() for that case, does that mess things later on downstream?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it could mean that the remote data is corrupted somehow, so I'm not sure what to do at that point. It's one of the things that should never happen. Practically, however, it will still use the remote data as reference and proceed as normal...

serialized_data.clone(),
block_size,
upload_reporter.clone(),
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love seeing the progress reporting have its own clean abstraction and having Upload and Download have their own ProgressStream implementations.

@hoytak hoytak merged commit 5d6371a into main Feb 19, 2026
7 checks passed
@hoytak hoytak deleted the hoytak/260202-progress-stream branch February 19, 2026 19:06
rajatarya pushed a commit that referenced this pull request Feb 27, 2026
## Summary

Wrap download progress updaters in `AggregatingProgressUpdater` to
eliminate GIL contention when Python callers provide per-file progress
callbacks.

The upload path has had this aggregation since v1.1.3 (PR #340), but the
download path was missed. Without aggregation, each XORB chunk triggers
a `spawn_blocking` + `Python::with_gil()` callback. With many concurrent
file downloads, this causes severe GIL contention — measured as a **4x
throughput reduction** (3000 MB/s → 750 MB/s on a 25 Gbps link).

The fix wraps the caller-provided `TrackingProgressUpdater` in an
`AggregatingProgressUpdater` (200ms flush interval) inside
`download_file_with_updater()`, matching the pattern already used by
`FileUploadSession`. This reduces Python callback frequency from
thousands/sec to ~5/sec per file while preserving progress bar feedback.

## Root cause

When `huggingface_hub` calls `hf_xet.download_files()`, it passes a
per-file Python callback for progress bar updates. On the Rust side,
each callback invocation goes through:

```
report_bytes_written() / report_transfer_progress()
  → tokio::spawn(register_updates())
    → spawn_blocking(Python::with_gil(callback))
```

With the detailed download progress tracking added in PR #645 (hf-xet
v1.3.0), both `report_bytes_written` and `report_transfer_progress` fire
per chunk, roughly doubling callback frequency. With 8+ concurrent file
downloads, each spawning dozens of concurrent XORB streams, the GIL
becomes a severe bottleneck.

## History

The problem has existed since xet download support was introduced, but
worsened over time:

| Version | Date | Impact |
|---------|------|--------|
| `huggingface_hub v0.30.0` / `hf-xet 0.1.x` | Mar 2025 | Moderate —
synchronous `with_gil()` per chunk, but hf_xet was an optional extra |
| `huggingface_hub v0.31.0` / `hf-xet >=1.1.0` | May 2025 | Moderate —
hf-xet became a hard dependency on x86_64/arm64 |
| `hf-xet v1.1.3` | Jun 2025 | Upload path fixed with
`AggregatingProgressUpdater` (PR #340); download path left unprotected |
| `hf-xet v1.3.0` | Feb 2026 | **Severe** — PR #645 added detailed
per-chunk progress tracking to downloads, doubling callback frequency
without aggregation |

PR #340 explicitly noted: *"each [update] has to acquire a global GIL
lock. This negatively affects the upload speed on fast connections"* —
the same problem, but only the upload side was addressed.

## Benchmarks

Downloading 3 safetensors files (16.1 GB total) from
`Qwen/Qwen3.5-35B-A3B` on a 25 Gbps machine:

| Test | Before | After |
|------|--------|-------|
| `download_files()` with `progress_updater=None` (baseline) | 3119 MB/s
| 3119 MB/s |
| `download_files()` with per-file Python callbacks | **746 MB/s** |
**1789 MB/s** |
| `snapshot_download()` (full Python CLI path with tqdm) | ~750 MB/s |
**2395 MB/s** |

Progress callback overhead drops from **4x slowdown to <1%**.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants