Skip to content

Concurrency bug in striped_downloads.md guide #4730

@fellhorn

Description

@fellhorn

Description

The guide on how to implement striped downloads uses tokio::fs::File in a non-thread-safe way by concurrently seeking the file cursor across the different tasks. The downloaded file will usually be messed up as the chunks are written with the wrong cursor position.

Fix

Instead of using seek & write one should use pwrite, e.g. with the synchronous write_all_at from std::os::unix::fs::FileExt, wrapped in a tokio::task::spawn_blocking.
Asynchronous write_at is not available in tokio yet.

Test

A simple comparison of a file downloaded in a single stripe or 32 stripes usually already shows the issue.
Add this test to striped.rs and change the bucket:

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_striped_download() -> anyhow::Result<()> {
        const MB: usize = 1024 * 1024;
        let bucket_name = "projects/_/buckets/my_bucket";
        let destination_single = "/tmp/striped_download_single.txt";
        let destination_striped = "/tmp/striped_download_striped.txt";

        let client = Storage::builder().build().await?;
        let control = StorageControl::builder().build().await?;

        seed(client.clone(), control.clone(), bucket_name).await?;

        // Download with large stripe size (single stripe)
        println!("\n--- Download with 32 MiB stripe (single stripe) ---");
        download(
            client.clone(),
            control.clone(),
            bucket_name,
            "32MiB.txt",
            32 * MB,
            destination_single,
        )
        .await?;

        // Download with small stripe size (32 stripes)
        println!("\n--- Download with 1 MiB stripe (32 stripes) ---");
        download(
            client.clone(),
            control.clone(),
            bucket_name,
            "32MiB.txt",
            MB,
            destination_striped,
        )
        .await?;

        // Compute and compare hashes
        println!("\n--- Computing hashes ---");
        let hash_single = compute_sha256(destination_single).await?;
        let hash_striped = compute_sha256(destination_striped).await?;

        println!("Hash (single stripe):  {}", hash_single);
        println!("Hash (32 stripes):     {}", hash_striped);

        assert_eq!(
            hash_single, hash_striped,
            "Hashes don't match! Files are different."
        );
        println!("\n✓ Hashes match! Files are identical.");

        // Clean up
        std::fs::remove_file(destination_single).ok();
        std::fs::remove_file(destination_striped).ok();

        Ok(())
    }

    async fn compute_sha256(file_path: &str) -> anyhow::Result<String> {
        use sha2::{Digest, Sha256};
        use tokio::io::AsyncReadExt;

        let mut file = tokio::fs::File::open(file_path).await?;
        let mut hasher = Sha256::new();
        let mut buffer = vec![0u8; 8192];

        loop {
            let n = file.read(&mut buffer).await?;
            if n == 0 {
                break;
            }
            hasher.update(&buffer[..n]);
        }

        let result = hasher.finalize();
        Ok(format!("{:x}", result))
    }
}

Output

--- Download with 32 MiB stripe (single stripe) ---
Completed 32.00 MiB download in 5.60917259s, using 1 stripes, effective bandwidth = 5.70 MiB/s

--- Download with 1 MiB stripe (32 stripes) ---
Completed 32.00 MiB download in 4.348293242s, using 32 stripes, effective bandwidth = 7.36 MiB/s

--- Computing hashes ---
Hash (single stripe):  37ac0b92c884a2cfb672e6f58e5c7f22161e1255d64a5a584960b07b55b0c709
Hash (32 stripes):     e6e0754b97336d80ed9dcae670f1b367967a2a48985f2061de82d1951276e63c

Metadata

Metadata

Assignees

Labels

priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions