Skip to content

biodatageeks/datafusion-bio-functions

Repository files navigation

datafusion-bio-functions

CI License

Apache DataFusion user-defined functions for bioinformatics, enabling SQL-based genomic analysis.

Overview

This workspace provides a collection of Rust crates that implement DataFusion UDFs (User-Defined Functions) for bioinformatics. It is the companion to datafusion-bio-formats (which handles bioinformatic file format readers). Both are consumed by polars-bio.

Crates

Crate Description Status
datafusion-bio-function-pileup Depth-of-coverage (pileup) computation from BAM alignments
datafusion-bio-function-ranges Interval join, coverage, count-overlaps, nearest-neighbor, overlap, merge, cluster, complement, and subtract operations
datafusion-bio-function-vep VEP variant annotation via lookup_variants() table function with parquet + Fjall KV cache backends

Features

Pileup (depth-of-coverage from BAM)

  • Depth-of-Coverage: Compute per-base depth from BAM alignment data using an efficient event-based algorithm
  • SQL Interface: Query coverage via SQL table function: SELECT * FROM depth('file.bam')
  • Per-Base Output: Optional one-row-per-position mode (like samtools depth -a) with lazy streaming to keep memory at O(batch_size)
  • DataFusion Integration: Native ExecutionPlan implementation with partition-parallel execution
  • Mosdepth-Compatible: Fast-mode behavior matching mosdepth (no mate-pair overlap deduplication)
  • Binary CIGAR: Zero-copy binary CIGAR processing — walks packed 4-byte ops directly from BAM, no string parsing
  • Dense Accumulation: Mosdepth-style flat i32[] depth arrays with O(1) writes and streaming per-contig emission

Ranges (interval operations)

  • Interval Join: SQL joins with range overlap conditions, automatically optimized via a custom physical planner
  • Coverage: Base-pair overlap depth between two interval sets via SELECT * FROM coverage('reads', 'targets')
  • Count Overlaps: Count overlapping intervals per region via SELECT * FROM count_overlaps('reads', 'targets')
  • Nearest: Nearest-neighbor interval matching via SQL join (CoitreesNearest) or nearest(...) table function
  • Overlap: Find all pairs of overlapping intervals between two tables via SELECT * FROM overlap('reads', 'targets') — delegates to IntervalJoinExec for optimal performance
  • Merge: Merge overlapping intervals within a single table via SELECT * FROM merge('intervals') — sweep-line algorithm with configurable min_dist and strict/weak modes
  • Cluster: Annotate intervals with cluster membership (cluster ID + boundaries) via SELECT * FROM cluster('intervals') — groups overlapping/nearby intervals with configurable min_dist
  • Complement: Find gaps (uncovered regions) relative to optional chromsizes view via SELECT * FROM complement('intervals', 'chromsizes')
  • Subtract: Basepair-level set difference via SELECT * FROM subtract('left', 'right') — fragments left intervals at right-interval overlap boundaries
  • Multiple Algorithms: Coitrees (default), IntervalTree, ArrayIntervalTree, Lapper, SuperIntervals — selectable via SET bio.interval_join_algorithm
  • Transparent Optimization: Hash/nested-loop joins with range conditions are automatically replaced with interval joins

VEP Annotation (lookup + consequence scaffolding)

  • lookup_variants() Table Function: SQL-based variant annotation against a pre-built VEP cache
  • KV Cache Backend: fjall LSM-tree store with zstd dictionary compression for compact on-disk storage
  • Match Modes: exact, exact_or_colocated_ids, exact_or_vep_existing (indel-aware relaxed matching)
  • annotate_vep() Table Function: backend-unified consequence annotation entrypoint (parquet or fjall) with phased CSQ rollout
  • Session Configuration: Tunable parameters via SQL SET statements under the bio.annotation namespace

annotate_vep API (Phase 1)

annotate_vep(
  vcf_table,
  cache_source,
  backend
  [, options_json]
)
  • vcf_table: registered input VCF table.
  • cache_source: backend-specific source path/identifier.
  • backend: parquet or fjall.
  • options_json (optional): backend/runtime options. Current keys:
    • transcripts_table: registered table name with transcript intervals/coding context.
    • exons_table: registered table name with exon intervals/order.
    • regulatory_table: registered table name with regulatory intervals.
    • motif_table: registered table name with motif/TFBS intervals.
    • mirna_table: registered table name with mature miRNA intervals.
    • sv_table: registered table name with structural-event overlaps.

Current phase behavior:

  • pass-through VCF rows,
  • derive known-variant metadata via lookup_variants,
  • if transcript/exon context tables are available (auto-discovery or options_json), compute transcript-aware SO terms and ranked most_severe_consequence,
  • otherwise fall back to initial cache-hit CSQ placeholder (sequence_variant) for matched rows and keep unmatched rows as NULL.

Ensembl-VEP Porting Plan

Detailed analysis and implementation plan are in:

Feature comparison snapshot:

Capability Ensembl-VEP (release 115) bio-functions-vep (current)
Known-variant cache lookup ✅ (--check_existing, colocated behavior) ✅ (lookup_variants + match modes)
Transcript consequence engine ✅ (full SO consequence model) 🚧 transcript/exon phase-2 baseline merged (not full VEP parity)
Supported consequence terms 41/41 🚧 41/41 term handlers wired; codon-accurate parity still in progress
Most severe consequence output 🚧 ranked SO output when transcript/exon context is available; fallback placeholder otherwise
Backend abstraction Cache/files in VEP ecosystem ✅ unified API for parquet + fjall entrypoint
Native SQL execution in DataFusion
Fjall KV backend

Golden Benchmark (annotate_vep vs Ensembl VEP 115)

Benchmark input copied into this repo:

  • vep-benchmark/data/HG002_chr22.vcf.gz
  • vep-benchmark/data/HG002_chr22.vcf.gz.tbi

Run the golden benchmark (samples first 1000 variants, runs Ensembl VEP 115 in Docker, runs annotate_vep, and writes a comparison report):

cargo run -p datafusion-bio-function-vep --example annotate_vep_golden_bench --release -- \
  vep-benchmark/data/HG002_chr22.vcf.gz \
  /Users/mwiewior/research/data/vep/115_GRCh38_variants.parquet \
  parquet \
  1000 \
  /Users/mwiewior/research/git/polars-bio-vep-benchmark/vep-benchmark/cache \
  vep-benchmark/data/HG002_chr22.vcf.gz \
  /tmp/annotate_vep_golden_bench

Output report:

  • /tmp/annotate_vep_golden_bench/HG002_chr22_1000_comparison_report.txt

Cache Backend Selection (lookup_variants)

lookup_variants(vcf_table, cache_table, ...) supports both backends through the same API:

  • Parquet or other scan-capable table providers: uses interval-join execution.
  • Fjall KvCacheTableProvider: dispatches to direct KV lookup execution.

Both cache backends must expose the same required cache column names: chrom, start, end, variation_name, allele_string.

lookup_variants API

lookup_variants(
  vcf_table,
  cache_table
  [, columns]
  [, match_mode]
  [, extended_probes]
)
  • vcf_table: registered VCF input table name.
  • cache_table: registered annotation cache table/provider name (Parquet or Fjall).
  • columns (optional): comma-separated cache columns to project. Default: all cache columns except chrom,start,end and source_*.
  • match_mode (optional, default exact):
    • exact: interval overlap + exact allele matching only.
    • exact_or_colocated_ids: runs exact; for unmatched rows, fills fallback-capable columns (variation_name, somatic) from co-located overlap IDs.
    • exact_or_vep_existing: runs exact; for unmatched rows, fills fallback-capable columns from indel-aware relaxed allele-compatible co-located rows (prefers rs* IDs for variation_name).
  • extended_probes (optional, default false): enables wider coordinate probing for VEP-style insertion/deletion coordinate encodings.

Example:

SELECT *
FROM lookup_variants('vcf_data', 'var_cache', 'variation_name,clin_sig', 'exact');

Parquet-backed cache table:

use datafusion::prelude::SessionContext;
use datafusion_bio_function_vep::register_vep_functions;

let ctx = SessionContext::new();
register_vep_functions(&ctx);

ctx.register_parquet("var_cache", "/path/to/variation_cache.parquet", Default::default()).await?;
let df = ctx
    .sql("SELECT * FROM lookup_variants('vcf', 'var_cache', 'variation_name,clin_sig')")
    .await?;

Fjall-backed cache provider:

use std::sync::Arc;
use datafusion::prelude::SessionContext;
use datafusion_bio_function_vep::register_vep_functions;
use datafusion_bio_function_vep::kv_cache::KvCacheTableProvider;

let ctx = SessionContext::new();
register_vep_functions(&ctx);

let kv_cache = KvCacheTableProvider::open("/path/to/variation_fjall")?;
ctx.register_table("var_cache", Arc::new(kv_cache))?;
let df = ctx
    .sql("SELECT * FROM lookup_variants('vcf', 'var_cache', 'variation_name,clin_sig')")
    .await?;

Create Fjall Cache

Create a full Fjall cache from a Parquet variation cache:

cargo run -p datafusion-bio-function-vep --example load_cache_full --features kv-cache --release -- \
  /path/to/115_GRCh38_variants.parquet \
  /path/to/variation_fjall \
  8
  • Third argument is thread count (target_partitions + loader parallelism).
  • Output path is recreated if it already exists.

Create a filtered cache (single chromosome) and optionally tune compression:

cargo run -p datafusion-bio-function-vep --example load_cache --features kv-cache --release -- \
  /path/to/115_GRCh38_variants.parquet \
  /path/to/variation_fjall_chr22 \
  22 \
  8 \
  9 \
  256
  • Args: <parquet_path> <fjall_output_path> [chrom_filter] [partitions] [zstd_level] [dict_size_kb]
  • Defaults for load_cache: zstd_level=3, dict_size_kb=112.

Annotation Configuration

Register AnnotationConfig on the session to enable SQL-level parameter overrides. If not registered, compiled defaults apply.

-- fjall block cache size in MB (default: 1024)
-- Larger values reduce cold-start latency by caching more LSM block index
-- pages and data blocks in memory.
SET bio.annotation.cache_size_mb = 2048;

-- Zstd compression level for cache writes (default: 3)
-- Higher levels produce smaller caches at the cost of slower writes.
-- Decompression speed is constant regardless of level.
-- Level 9 is a good balance for write-once caches.
SET bio.annotation.zstd_level = 9;

-- Zstd dictionary size in KB for cache writes (default: 112)
-- Larger dictionaries can improve compression ratio.
-- 256 KB is recommended with level 9.
SET bio.annotation.dict_size_kb = 256;
Parameter Default Description
bio.annotation.cache_size_mb 1024 fjall block cache size in MB for reads
bio.annotation.zstd_level 3 Zstd compression level for cache writes (1–19)
bio.annotation.dict_size_kb 112 Zstd dictionary size in KB for cache writes

Registration from downstream (e.g., polars-bio):

use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_bio_function_vep::{AnnotationConfig, register_vep_functions};
use datafusion_bio_function_vep::config::resolve;

// Register the config extension on the session
let config = SessionConfig::new()
    .with_option_extension(AnnotationConfig::default());
let ctx = SessionContext::new_with_config(config);
register_vep_functions(&ctx);

// Read resolved values (SQL overrides or defaults)
let ann = resolve(&ctx);

Installation

Add the crates you need to your Cargo.toml:

[dependencies]
datafusion = "50.3.0"
datafusion-bio-function-pileup = { git = "https://github.com/biodatageeks/datafusion-bio-functions" }
datafusion-bio-function-ranges = { git = "https://github.com/biodatageeks/datafusion-bio-functions" }

Quick Start

Coverage via SQL Table Function

The simplest way to compute coverage is via the SQL depth() table function:

use datafusion::prelude::*;
use datafusion_bio_function_pileup::register_pileup_functions;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();
    register_pileup_functions(&ctx);

    // 1-based coordinates (default, matches polars-bio)
    let df = ctx.sql("SELECT * FROM depth('path/to/alignments.bam')").await?;
    df.show().await?;

    // 0-based coordinates (explicit)
    let df = ctx.sql("SELECT * FROM depth('path/to/alignments.bam', true)").await?;
    df.show().await?;

    // Per-base output: one row per genomic position (like samtools depth -a)
    let df = ctx.sql("SELECT * FROM depth('path/to/alignments.bam', true, true)").await?;
    df.show().await?;

    Ok(())
}

The optional arguments control behavior:

SELECT * FROM depth('file.bam')                -- 1-based, block output (default)
SELECT * FROM depth('file.bam', true)          -- 0-based, block output
SELECT * FROM depth('file.bam', false)         -- 1-based, block output (explicit)
SELECT * FROM depth('file.bam', true, true)    -- 0-based, per-base output
SELECT * FROM depth('file.bam', false, true)   -- 1-based, per-base output
Argument Type Default Description
path string (required) Path to BAM file
zero_based bool false Use 0-based coordinates
per_base bool false Emit one row per position instead of RLE blocks

Block output schema (default)

Column Type Description
contig Utf8 Chromosome/contig name
pos_start Int32 Block start position (inclusive)
pos_end Int32 Block end position (inclusive)
coverage Int16 Read depth in this block

Per-base output schema (per_base=true)

Column Type Description
contig Utf8 Chromosome/contig name
pos Int32 Genomic position
coverage Int16 Read depth at this position

Per-base mode emits one row for every genomic position in each contig (including 0-coverage positions), similar to samtools depth -a. It requires dense mode (BAM header with contig lengths) and uses a lazy PerBaseEmitter that streams positions in batch-sized chunks, keeping memory at O(batch_size).

Both schemas include metadata key bio.coordinate_system_zero_based ("true" or "false") so downstream consumers can interpret the coordinate system.

Interval Join via SQL

Create a bio-configured session and write standard SQL — range overlap joins are automatically optimized:

use datafusion_bio_function_ranges::create_bio_session;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = create_bio_session();

    // Register your tables (CSV, Parquet, or any DataFusion table provider)
    ctx.register_csv("reads", "reads.csv", Default::default()).await?;
    ctx.register_csv("targets", "targets.csv", Default::default()).await?;

    // Range overlap joins are automatically optimized as interval joins
    let df = ctx.sql("
        SELECT *
        FROM reads
        JOIN targets
          ON reads.contig = targets.contig
          AND reads.pos_start <= targets.pos_end
          AND reads.pos_end >= targets.pos_start
    ").await?;
    df.show().await?;

    Ok(())
}

Coverage and Count Overlaps via SQL Table Functions

-- Count overlapping intervals per target region
SELECT * FROM count_overlaps('reads', 'targets')

-- Base-pair coverage depth
SELECT * FROM coverage('reads', 'targets')

-- Custom column names (shared for both tables)
SELECT * FROM coverage('reads', 'targets', 'chrom', 'start', 'end')

-- Separate column names for left and right tables
SELECT * FROM coverage('reads', 'targets', 'chrom', 'start', 'end', 'contig', 'pos_start', 'pos_end')

-- 0-based half-open coordinates
SELECT * FROM count_overlaps('reads', 'targets', 'contig', 'pos_start', 'pos_end', 'strict')

-- Nearest neighbors (left_* and right_* output columns)
SELECT * FROM nearest('targets', 'reads')

-- Top-3 nearest neighbors per right interval, ignoring overlaps
SELECT * FROM nearest('targets', 'reads', 3, false)

-- Default k=1, ignore overlaps
SELECT * FROM nearest('targets', 'reads', false)

-- 0-based half-open coordinates
SELECT * FROM nearest('targets', 'reads', 1, true, 'contig', 'pos_start', 'pos_end', 'strict')

-- All pairs of overlapping intervals (left_* and right_* output columns)
SELECT * FROM overlap('reads', 'targets')

-- Overlap with 0-based half-open coordinates
SELECT * FROM overlap('reads', 'targets', 'contig', 'pos_start', 'pos_end', 'strict')

-- Separate column names for left and right tables
SELECT * FROM overlap('reads', 'targets', 'l_chr', 'l_s', 'l_e', 'r_chr', 'r_s', 'r_e')

-- Merge overlapping intervals within a single table
SELECT * FROM merge('intervals')

-- Merge with minimum distance between intervals
SELECT * FROM merge('intervals', 10)

-- Merge with custom columns and 0-based half-open coordinates
SELECT * FROM merge('intervals', 0, 'contig', 'pos_start', 'pos_end', 'strict')

-- Cluster: annotate intervals with cluster ID and boundaries
SELECT * FROM cluster('intervals')

-- Cluster with minimum distance between clusters
SELECT * FROM cluster('intervals', 5)

-- Cluster with custom columns and 0-based half-open coordinates
SELECT * FROM cluster('intervals', 0, 'contig', 'pos_start', 'pos_end', 'strict')

-- Complement: find gaps (uncovered regions) with default view (0..MAX per contig)
SELECT * FROM complement('intervals')

-- Complement with chromsizes view table
SELECT * FROM complement('intervals', 'chromsizes')

-- Complement with custom column names
SELECT * FROM complement('intervals', 'chromsizes', 'chrom', 'start', 'end')

-- Subtract: basepair-level set difference
SELECT * FROM subtract('left_table', 'right_table')

-- Subtract with custom columns and strict mode
SELECT * FROM subtract('left_table', 'right_table', 'chrom', 'start', 'end', 'strict')

nearest() accepted forms:

  • nearest('left', 'right')
  • nearest('left', 'right', k)
  • nearest('left', 'right', overlap_bool)
  • nearest('left', 'right', k, overlap_bool)
  • optional shared or separate column names, with optional trailing 'strict'|'weak'

Nearest Interval Matching

SET bio.interval_join_algorithm = CoitreesNearest;

-- Returns exactly one match per right-side row:
-- the overlapping interval if one exists, otherwise the nearest by distance
-- if no keyed candidate exists, left columns are NULL
SELECT *
FROM targets
JOIN reads
  ON targets.contig = reads.contig
  AND targets.pos_start <= reads.pos_end
  AND targets.pos_end >= reads.pos_start

Algorithm Selection

SET bio.interval_join_algorithm = Coitrees;        -- default, best general performance
SET bio.interval_join_algorithm = IntervalTree;     -- rust-bio interval tree
SET bio.interval_join_algorithm = ArrayIntervalTree; -- rust-bio array-backed
SET bio.interval_join_algorithm = Lapper;           -- rust-lapper
SET bio.interval_join_algorithm = SuperIntervals;   -- SIMD-optimized (AVX2/NEON)

Coverage via Programmatic API

For more control, use PileupExec directly:

use std::sync::Arc;
use datafusion::prelude::*;
use datafusion_bio_format_bam::table_provider::BamTableProvider;
use datafusion_bio_function_pileup::physical_exec::{PileupConfig, PileupExec};
use futures::StreamExt;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    // Register BAM file as a table (zero_based=false for 1-based coordinates)
    let table = BamTableProvider::new("path/to/alignments.bam", None, false, None, true).await?;
    let ctx = SessionContext::new();
    ctx.register_table("reads", Arc::new(table))?;

    // Create physical plan and wrap with PileupExec
    let df = ctx.table("reads").await?;
    let plan = df.create_physical_plan().await?;
    let pileup = PileupExec::new(plan, PileupConfig::default()); // zero_based=false by default

    // Execute and collect results
    let task_ctx = ctx.task_ctx();
    let stream = pileup.execute(0, task_ctx)?;
    let batches: Vec<_> = stream.collect::<Vec<_>>().await
        .into_iter()
        .filter_map(|r| r.ok())
        .collect();

    for batch in &batches {
        println!("{:?}", batch);
    }
    Ok(())
}

Filtering

Coverage computation automatically filters reads using SAM flags (default: exclude unmapped, secondary, failed QC, duplicate reads) and minimum mapping quality:

use datafusion_bio_function_pileup::ReadFilter;
use datafusion_bio_function_pileup::physical_exec::{PileupConfig, PileupExec};

let config = PileupConfig {
    filter: ReadFilter {
        filter_flag: 1796,        // unmapped | secondary | failed_qc | duplicate
        min_mapping_quality: 20,  // minimum MAPQ
    },
    ..PileupConfig::default()     // zero_based=false, binary_cigar=true, dense_mode=Auto
};
let pileup = PileupExec::new(plan, config);

Downstream Library Integration

This crate is designed to be consumed by downstream libraries such as polars-bio. This section documents the key APIs and configuration options.

Dependency Setup

Pin to a specific commit to ensure reproducible builds:

[dependencies]
datafusion-bio-function-pileup = { git = "https://github.com/biodatageeks/datafusion-bio-functions", rev = "<commit-hash>" }
datafusion-bio-format-bam = { git = "https://github.com/biodatageeks/datafusion-bio-formats", rev = "<commit-hash>" }

Keep datafusion-bio-formats and datafusion-bio-functions revisions in sync — they must use the same DataFusion/Arrow versions (currently DataFusion 50.3.0, Arrow 56.x).

Feature Flags

Feature Default Description
bam yes Enables the table_function module (DepthFunction, register_pileup_functions) which depends on datafusion-bio-format-bam

Downstream libraries that provide their own TableProvider (e.g., polars-bio) can disable the default feature to avoid pulling in datafusion-bio-format-bam as a runtime dependency:

[dependencies]
datafusion-bio-function-pileup = { git = "...", rev = "...", default-features = false }

The core pileup API (PileupExec, PileupConfig, cigar::*, events::*, coverage::*) is always available regardless of features.

BamTableProvider API

The BAM reader accepts a binary_cigar parameter:

BamTableProvider::new(
    file_path,           // String: path to BAM file (local or object-store URL)
    storage_opts,        // Option<ObjectStorageOptions>: S3/GCS/Azure credentials
    zero_based,          // bool: false for 1-based (default), true for 0-based
    tag_fields,          // Option<Vec<String>>: optional BAM tag fields to include
    binary_cigar,        // bool: true for zero-copy binary CIGAR (recommended)
).await?

When binary_cigar=true, the CIGAR column has DataType::Binary (packed LE u32 ops) instead of DataType::Utf8 (string like "10M2I5M"). The pileup crate auto-detects the format from the schema.

PileupConfig Defaults

PileupConfig::default() uses the fastest settings:

Field Default Description
zero_based false 1-based coordinates (matches polars-bio default)
per_base false RLE block output; set true for one-row-per-position
binary_cigar true Use binary CIGAR (zero-copy, no string parsing)
dense_mode Auto (dense) Dense i32[] array accumulation with streaming emission
filter.filter_flag 1796 Exclude unmapped, secondary, failed QC, duplicate
filter.min_mapping_quality 0 No MAPQ threshold

To override specific fields:

let config = PileupConfig {
    zero_based: true,                       // 0-based coordinates
    per_base: true,                         // one row per position (requires dense mode)
    binary_cigar: false,                    // force string CIGAR (e.g., for debugging)
    dense_mode: DenseMode::Disable,         // force sparse BTreeMap accumulation
    filter: ReadFilter {
        min_mapping_quality: 20,
        ..ReadFilter::default()
    },
    ..PileupConfig::default()
};

Accumulation Strategies

Strategy Best For Memory How
Dense (default) WGS, high-coverage One contig at a time Flat i32[] indexed by position
Sparse WES, targeted panels Proportional to read count BTreeMap<pos, delta> per contig

Dense mode requires BAM header metadata (contig lengths) in the schema. If unavailable (e.g., MemTable in tests), it falls back to sparse automatically.

End-to-End Example (polars-bio style)

use std::sync::Arc;
use datafusion::prelude::*;
use datafusion_bio_format_bam::table_provider::BamTableProvider;
use datafusion_bio_function_pileup::physical_exec::{PileupConfig, PileupExec};
use futures::StreamExt;

async fn compute_coverage(bam_path: &str, partitions: usize) -> Vec<RecordBatch> {
    let table = BamTableProvider::new(
        bam_path.to_string(), None, false, None, true,  // zero_based=false, binary_cigar=true
    ).await.unwrap();

    let config = SessionConfig::new().with_target_partitions(partitions);
    let ctx = SessionContext::new_with_config(config);
    ctx.register_table("reads", Arc::new(table)).unwrap();

    // Select only columns needed for coverage (avoids decoding sequence, quality, tags)
    let df = ctx.sql("SELECT chrom, start, flags, cigar, mapping_quality FROM reads")
        .await.unwrap();
    let plan = df.create_physical_plan().await.unwrap();

    let pileup = Arc::new(PileupExec::new(plan, PileupConfig::default()));
    let task_ctx = ctx.task_ctx();

    // Run all partitions concurrently
    let mut handles = Vec::new();
    for partition in 0..pileup.properties().partitioning.partition_count() {
        let pileup = pileup.clone();
        let task_ctx = task_ctx.clone();
        handles.push(tokio::spawn(async move {
            pileup.execute(partition, task_ctx).unwrap()
                .collect::<Vec<_>>().await
                .into_iter()
                .filter_map(|r| r.ok())
                .collect::<Vec<_>>()
        }));
    }

    let mut all_batches = Vec::new();
    for handle in handles {
        all_batches.extend(handle.await.unwrap());
    }
    all_batches
}

Algorithm

The coverage algorithm is ported from SeQuiLa and uses an event-based approach:

  1. CIGAR Processing: Each aligned read's CIGAR generates +1 (start) and -1 (end) events at reference positions. With binary CIGAR (default), packed 4-byte ops are walked directly — no string parsing.
  2. Dense Accumulation (default): Events are written to a flat i32[] array per contig (O(1) writes). Completed contigs are streamed out as soon as the BAM's coordinate-sorted order moves to the next contig — peak memory is one contig at a time.
  3. Output Generation:
    • Block mode (default): A sweep-line pass computes running coverage and emits run-length encoded blocks where coverage changes. Zero-coverage gaps are skipped.
    • Per-base mode (per_base=true): A lazy PerBaseEmitter walks the delta array with a prefix sum, emitting one row per genomic position (including 0-coverage). Batches are streamed one at a time to keep memory at O(batch_size).

The dense path naturally handles complex CIGAR operations (insertions, deletions, skipped regions) and provides excellent cache locality. A sparse BTreeMap fallback is available for targeted sequencing panels where contig-level arrays would be wasteful.

Validation Against samtools

The coverage output has been validated position-by-position against samtools depth -a on a 2.0 GB WES BAM file (NA12878 chr1, 19.1M reads, 247M genomic positions). Both block and per-base output modes produce zero mismatches across all 247,249,719 positions.

Running the validation

1. Generate reference output with samtools:

samtools depth -a /path/to/alignments.bam > /tmp/samtools_depth.tsv

2. Generate our output using the dump_coverage example:

# Block output (BED format: contig, start, end_exclusive, coverage)
cargo run --release --example dump_coverage -- /path/to/alignments.bam --zero-based > /tmp/our_blocks.bed

# Per-base output (contig, pos_0based, coverage)
cargo run --release --example dump_coverage -- /path/to/alignments.bam --zero-based --per-base > /tmp/our_per_base.tsv

3. Compare with the provided script:

# Compare both outputs against samtools
python3 datafusion/bio-function-pileup/scripts/compare_samtools.py \
    /tmp/samtools_depth.tsv \
    --per-base /tmp/our_per_base.tsv \
    --blocks /tmp/our_blocks.bed

# Compare only block output
python3 datafusion/bio-function-pileup/scripts/compare_samtools.py \
    /tmp/samtools_depth.tsv \
    --blocks /tmp/our_blocks.bed

4. Verify multi-partition consistency:

The bench_coverage example reports block counts. These should be identical regardless of --partitions:

cargo run --release --example bench_coverage -- /path/to/alignments.bam --partitions 1
cargo run --release --example bench_coverage -- /path/to/alignments.bam --partitions 8
cargo run --release --example bench_coverage -- /path/to/alignments.bam --partitions 16

Validated results (NA12878 chr1)

Output mode Positions compared Mismatches
Block (expanded to per-base) 247,249,719 0
Per-base 247,249,719 0
Partitions Block count Match?
1 7,658,496 Yes
4 7,658,496 Yes
8 7,658,496 Yes
16 7,658,496 Yes

Development

Build

cargo build

Test

cargo test --all

Format

cargo fmt --all

Lint

cargo clippy --all-targets --all-features -- -D warnings

Documentation

cargo doc --no-deps --all-features --open

Requirements

  • Rust 1.91.0 or later (specified in rust-toolchain.toml)
  • Git (for building crates with git dependencies)

Related Projects

  • datafusion-bio-formats - DataFusion table providers for bioinformatics file formats
  • polars-bio - Polars extensions for bioinformatics
  • SeQuiLa - Original Scala implementation of the pileup algorithm

License

Licensed under Apache-2.0. See LICENSE for details.

Contributing

Contributions are welcome! Please feel free to submit issues or pull requests.

Acknowledgments

This project builds upon:

  • Apache DataFusion - Fast, extensible query engine
  • mosdepth - Test data and reference implementation for coverage validation
  • SeQuiLa - Original pileup and interval join algorithm design
  • coitrees - Cache-oblivious interval tree (default algorithm)
  • superintervals - SIMD-optimized interval tree (vendored, MIT license)

About

Apache DataFusion functions for bioinformatic operations

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors