Description
The guide on how to implement striped downloads uses tokio::fs::File in a non-thread-safe way by concurrently seeking the file cursor across the different tasks. The downloaded file will usually be messed up as the chunks are written with the wrong cursor position.
Fix
Instead of using seek & write one should use pwrite, e.g. with the synchronous write_all_at from std::os::unix::fs::FileExt, wrapped in a tokio::task::spawn_blocking.
Asynchronous write_at is not available in tokio yet.
Test
A simple comparison of a file downloaded in a single stripe or 32 stripes usually already shows the issue.
Add this test to striped.rs and change the bucket:
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_striped_download() -> anyhow::Result<()> {
const MB: usize = 1024 * 1024;
let bucket_name = "projects/_/buckets/my_bucket";
let destination_single = "/tmp/striped_download_single.txt";
let destination_striped = "/tmp/striped_download_striped.txt";
let client = Storage::builder().build().await?;
let control = StorageControl::builder().build().await?;
seed(client.clone(), control.clone(), bucket_name).await?;
// Download with large stripe size (single stripe)
println!("\n--- Download with 32 MiB stripe (single stripe) ---");
download(
client.clone(),
control.clone(),
bucket_name,
"32MiB.txt",
32 * MB,
destination_single,
)
.await?;
// Download with small stripe size (32 stripes)
println!("\n--- Download with 1 MiB stripe (32 stripes) ---");
download(
client.clone(),
control.clone(),
bucket_name,
"32MiB.txt",
MB,
destination_striped,
)
.await?;
// Compute and compare hashes
println!("\n--- Computing hashes ---");
let hash_single = compute_sha256(destination_single).await?;
let hash_striped = compute_sha256(destination_striped).await?;
println!("Hash (single stripe): {}", hash_single);
println!("Hash (32 stripes): {}", hash_striped);
assert_eq!(
hash_single, hash_striped,
"Hashes don't match! Files are different."
);
println!("\n✓ Hashes match! Files are identical.");
// Clean up
std::fs::remove_file(destination_single).ok();
std::fs::remove_file(destination_striped).ok();
Ok(())
}
async fn compute_sha256(file_path: &str) -> anyhow::Result<String> {
use sha2::{Digest, Sha256};
use tokio::io::AsyncReadExt;
let mut file = tokio::fs::File::open(file_path).await?;
let mut hasher = Sha256::new();
let mut buffer = vec![0u8; 8192];
loop {
let n = file.read(&mut buffer).await?;
if n == 0 {
break;
}
hasher.update(&buffer[..n]);
}
let result = hasher.finalize();
Ok(format!("{:x}", result))
}
}
Output
--- Download with 32 MiB stripe (single stripe) ---
Completed 32.00 MiB download in 5.60917259s, using 1 stripes, effective bandwidth = 5.70 MiB/s
--- Download with 1 MiB stripe (32 stripes) ---
Completed 32.00 MiB download in 4.348293242s, using 32 stripes, effective bandwidth = 7.36 MiB/s
--- Computing hashes ---
Hash (single stripe): 37ac0b92c884a2cfb672e6f58e5c7f22161e1255d64a5a584960b07b55b0c709
Hash (32 stripes): e6e0754b97336d80ed9dcae670f1b367967a2a48985f2061de82d1951276e63c
Description
The guide on how to implement striped downloads uses
tokio::fs::Filein a non-thread-safe way by concurrently seeking the file cursor across the different tasks. The downloaded file will usually be messed up as the chunks are written with the wrong cursor position.Fix
Instead of using
seek&writeone should usepwrite, e.g. with the synchronouswrite_all_atfromstd::os::unix::fs::FileExt, wrapped in atokio::task::spawn_blocking.Asynchronous
write_atis not available in tokio yet.Test
A simple comparison of a file downloaded in a single stripe or 32 stripes usually already shows the issue.
Add this test to striped.rs and change the bucket:
Output