Apache DataFusion user-defined functions for bioinformatics, enabling SQL-based genomic analysis.
This workspace provides a collection of Rust crates that implement DataFusion UDFs (User-Defined Functions) for bioinformatics. It is the companion to datafusion-bio-formats (which handles bioinformatic file format readers). Both are consumed by polars-bio.
| Crate | Description | Status |
|---|---|---|
| datafusion-bio-function-pileup | Depth-of-coverage (pileup) computation from BAM alignments | ✅ |
| datafusion-bio-function-ranges | Interval join, coverage, count-overlaps, nearest-neighbor, overlap, merge, cluster, complement, and subtract operations | ✅ |
| datafusion-bio-function-vep | VEP variant annotation via lookup_variants() table function with parquet + Fjall KV cache backends |
✅ |
- Depth-of-Coverage: Compute per-base depth from BAM alignment data using an efficient event-based algorithm
- SQL Interface: Query coverage via SQL table function:
SELECT * FROM depth('file.bam') - Per-Base Output: Optional one-row-per-position mode (like
samtools depth -a) with lazy streaming to keep memory at O(batch_size) - DataFusion Integration: Native
ExecutionPlanimplementation with partition-parallel execution - Mosdepth-Compatible: Fast-mode behavior matching mosdepth (no mate-pair overlap deduplication)
- Binary CIGAR: Zero-copy binary CIGAR processing — walks packed 4-byte ops directly from BAM, no string parsing
- Dense Accumulation: Mosdepth-style flat
i32[]depth arrays with O(1) writes and streaming per-contig emission
- Interval Join: SQL joins with range overlap conditions, automatically optimized via a custom physical planner
- Coverage: Base-pair overlap depth between two interval sets via
SELECT * FROM coverage('reads', 'targets') - Count Overlaps: Count overlapping intervals per region via
SELECT * FROM count_overlaps('reads', 'targets') - Nearest: Nearest-neighbor interval matching via SQL join (
CoitreesNearest) ornearest(...)table function - Overlap: Find all pairs of overlapping intervals between two tables via
SELECT * FROM overlap('reads', 'targets')— delegates to IntervalJoinExec for optimal performance - Merge: Merge overlapping intervals within a single table via
SELECT * FROM merge('intervals')— sweep-line algorithm with configurablemin_distandstrict/weakmodes - Cluster: Annotate intervals with cluster membership (cluster ID + boundaries) via
SELECT * FROM cluster('intervals')— groups overlapping/nearby intervals with configurablemin_dist - Complement: Find gaps (uncovered regions) relative to optional chromsizes view via
SELECT * FROM complement('intervals', 'chromsizes') - Subtract: Basepair-level set difference via
SELECT * FROM subtract('left', 'right')— fragments left intervals at right-interval overlap boundaries - Multiple Algorithms: Coitrees (default), IntervalTree, ArrayIntervalTree, Lapper, SuperIntervals — selectable via
SET bio.interval_join_algorithm - Transparent Optimization: Hash/nested-loop joins with range conditions are automatically replaced with interval joins
lookup_variants()Table Function: SQL-based variant annotation against a pre-built VEP cache- KV Cache Backend: fjall LSM-tree store with zstd dictionary compression for compact on-disk storage
- Match Modes:
exact,exact_or_colocated_ids,exact_or_vep_existing(indel-aware relaxed matching) annotate_vep()Table Function: backend-unified consequence annotation entrypoint (parquetorfjall) with phased CSQ rollout- Session Configuration: Tunable parameters via SQL
SETstatements under thebio.annotationnamespace
annotate_vep(
vcf_table,
cache_source,
backend
[, options_json]
)vcf_table: registered input VCF table.cache_source: backend-specific source path/identifier.backend:parquetorfjall.options_json(optional): backend/runtime options. Current keys:transcripts_table: registered table name with transcript intervals/coding context.exons_table: registered table name with exon intervals/order.regulatory_table: registered table name with regulatory intervals.motif_table: registered table name with motif/TFBS intervals.mirna_table: registered table name with mature miRNA intervals.sv_table: registered table name with structural-event overlaps.
Current phase behavior:
- pass-through VCF rows,
- derive known-variant metadata via
lookup_variants, - if transcript/exon context tables are available (auto-discovery or
options_json), compute transcript-aware SO terms and rankedmost_severe_consequence, - otherwise fall back to initial cache-hit CSQ placeholder (
sequence_variant) for matched rows and keep unmatched rows asNULL.
Detailed analysis and implementation plan are in:
Feature comparison snapshot:
| Capability | Ensembl-VEP (release 115) | bio-functions-vep (current) |
|---|---|---|
| Known-variant cache lookup | ✅ (--check_existing, colocated behavior) |
✅ (lookup_variants + match modes) |
| Transcript consequence engine | ✅ (full SO consequence model) | 🚧 transcript/exon phase-2 baseline merged (not full VEP parity) |
| Supported consequence terms | 41/41 | 🚧 41/41 term handlers wired; codon-accurate parity still in progress |
| Most severe consequence output | ✅ | 🚧 ranked SO output when transcript/exon context is available; fallback placeholder otherwise |
| Backend abstraction | Cache/files in VEP ecosystem | ✅ unified API for parquet + fjall entrypoint |
| Native SQL execution in DataFusion | ❌ | ✅ |
| Fjall KV backend | ❌ | ✅ |
Benchmark input copied into this repo:
vep-benchmark/data/HG002_chr22.vcf.gzvep-benchmark/data/HG002_chr22.vcf.gz.tbi
Run the golden benchmark (samples first 1000 variants, runs Ensembl VEP 115 in Docker, runs annotate_vep, and writes a comparison report):
cargo run -p datafusion-bio-function-vep --example annotate_vep_golden_bench --release -- \
vep-benchmark/data/HG002_chr22.vcf.gz \
/Users/mwiewior/research/data/vep/115_GRCh38_variants.parquet \
parquet \
1000 \
/Users/mwiewior/research/git/polars-bio-vep-benchmark/vep-benchmark/cache \
vep-benchmark/data/HG002_chr22.vcf.gz \
/tmp/annotate_vep_golden_benchOutput report:
/tmp/annotate_vep_golden_bench/HG002_chr22_1000_comparison_report.txt
lookup_variants(vcf_table, cache_table, ...) supports both backends through the same API:
- Parquet or other scan-capable table providers: uses interval-join execution.
- Fjall
KvCacheTableProvider: dispatches to direct KV lookup execution.
Both cache backends must expose the same required cache column names:
chrom, start, end, variation_name, allele_string.
lookup_variants(
vcf_table,
cache_table
[, columns]
[, match_mode]
[, extended_probes]
)vcf_table: registered VCF input table name.cache_table: registered annotation cache table/provider name (Parquet or Fjall).columns(optional): comma-separated cache columns to project. Default: all cache columns exceptchrom,start,endandsource_*.match_mode(optional, defaultexact):exact: interval overlap + exact allele matching only.exact_or_colocated_ids: runsexact; for unmatched rows, fills fallback-capable columns (variation_name,somatic) from co-located overlap IDs.exact_or_vep_existing: runsexact; for unmatched rows, fills fallback-capable columns from indel-aware relaxed allele-compatible co-located rows (prefersrs*IDs forvariation_name).
extended_probes(optional, defaultfalse): enables wider coordinate probing for VEP-style insertion/deletion coordinate encodings.
Example:
SELECT *
FROM lookup_variants('vcf_data', 'var_cache', 'variation_name,clin_sig', 'exact');Parquet-backed cache table:
use datafusion::prelude::SessionContext;
use datafusion_bio_function_vep::register_vep_functions;
let ctx = SessionContext::new();
register_vep_functions(&ctx);
ctx.register_parquet("var_cache", "/path/to/variation_cache.parquet", Default::default()).await?;
let df = ctx
.sql("SELECT * FROM lookup_variants('vcf', 'var_cache', 'variation_name,clin_sig')")
.await?;Fjall-backed cache provider:
use std::sync::Arc;
use datafusion::prelude::SessionContext;
use datafusion_bio_function_vep::register_vep_functions;
use datafusion_bio_function_vep::kv_cache::KvCacheTableProvider;
let ctx = SessionContext::new();
register_vep_functions(&ctx);
let kv_cache = KvCacheTableProvider::open("/path/to/variation_fjall")?;
ctx.register_table("var_cache", Arc::new(kv_cache))?;
let df = ctx
.sql("SELECT * FROM lookup_variants('vcf', 'var_cache', 'variation_name,clin_sig')")
.await?;Create a full Fjall cache from a Parquet variation cache:
cargo run -p datafusion-bio-function-vep --example load_cache_full --features kv-cache --release -- \
/path/to/115_GRCh38_variants.parquet \
/path/to/variation_fjall \
8- Third argument is thread count (
target_partitions+ loader parallelism). - Output path is recreated if it already exists.
Create a filtered cache (single chromosome) and optionally tune compression:
cargo run -p datafusion-bio-function-vep --example load_cache --features kv-cache --release -- \
/path/to/115_GRCh38_variants.parquet \
/path/to/variation_fjall_chr22 \
22 \
8 \
9 \
256- Args:
<parquet_path> <fjall_output_path> [chrom_filter] [partitions] [zstd_level] [dict_size_kb] - Defaults for
load_cache:zstd_level=3,dict_size_kb=112.
Register AnnotationConfig on the session to enable SQL-level parameter overrides. If not registered, compiled defaults apply.
-- fjall block cache size in MB (default: 1024)
-- Larger values reduce cold-start latency by caching more LSM block index
-- pages and data blocks in memory.
SET bio.annotation.cache_size_mb = 2048;
-- Zstd compression level for cache writes (default: 3)
-- Higher levels produce smaller caches at the cost of slower writes.
-- Decompression speed is constant regardless of level.
-- Level 9 is a good balance for write-once caches.
SET bio.annotation.zstd_level = 9;
-- Zstd dictionary size in KB for cache writes (default: 112)
-- Larger dictionaries can improve compression ratio.
-- 256 KB is recommended with level 9.
SET bio.annotation.dict_size_kb = 256;| Parameter | Default | Description |
|---|---|---|
bio.annotation.cache_size_mb |
1024 |
fjall block cache size in MB for reads |
bio.annotation.zstd_level |
3 |
Zstd compression level for cache writes (1–19) |
bio.annotation.dict_size_kb |
112 |
Zstd dictionary size in KB for cache writes |
Registration from downstream (e.g., polars-bio):
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_bio_function_vep::{AnnotationConfig, register_vep_functions};
use datafusion_bio_function_vep::config::resolve;
// Register the config extension on the session
let config = SessionConfig::new()
.with_option_extension(AnnotationConfig::default());
let ctx = SessionContext::new_with_config(config);
register_vep_functions(&ctx);
// Read resolved values (SQL overrides or defaults)
let ann = resolve(&ctx);Add the crates you need to your Cargo.toml:
[dependencies]
datafusion = "50.3.0"
datafusion-bio-function-pileup = { git = "https://github.com/biodatageeks/datafusion-bio-functions" }
datafusion-bio-function-ranges = { git = "https://github.com/biodatageeks/datafusion-bio-functions" }The simplest way to compute coverage is via the SQL depth() table function:
use datafusion::prelude::*;
use datafusion_bio_function_pileup::register_pileup_functions;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
register_pileup_functions(&ctx);
// 1-based coordinates (default, matches polars-bio)
let df = ctx.sql("SELECT * FROM depth('path/to/alignments.bam')").await?;
df.show().await?;
// 0-based coordinates (explicit)
let df = ctx.sql("SELECT * FROM depth('path/to/alignments.bam', true)").await?;
df.show().await?;
// Per-base output: one row per genomic position (like samtools depth -a)
let df = ctx.sql("SELECT * FROM depth('path/to/alignments.bam', true, true)").await?;
df.show().await?;
Ok(())
}The optional arguments control behavior:
SELECT * FROM depth('file.bam') -- 1-based, block output (default)
SELECT * FROM depth('file.bam', true) -- 0-based, block output
SELECT * FROM depth('file.bam', false) -- 1-based, block output (explicit)
SELECT * FROM depth('file.bam', true, true) -- 0-based, per-base output
SELECT * FROM depth('file.bam', false, true) -- 1-based, per-base output| Argument | Type | Default | Description |
|---|---|---|---|
path |
string | (required) | Path to BAM file |
zero_based |
bool | false |
Use 0-based coordinates |
per_base |
bool | false |
Emit one row per position instead of RLE blocks |
| Column | Type | Description |
|---|---|---|
contig |
Utf8 | Chromosome/contig name |
pos_start |
Int32 | Block start position (inclusive) |
pos_end |
Int32 | Block end position (inclusive) |
coverage |
Int16 | Read depth in this block |
| Column | Type | Description |
|---|---|---|
contig |
Utf8 | Chromosome/contig name |
pos |
Int32 | Genomic position |
coverage |
Int16 | Read depth at this position |
Per-base mode emits one row for every genomic position in each contig (including 0-coverage positions), similar to samtools depth -a. It requires dense mode (BAM header with contig lengths) and uses a lazy PerBaseEmitter that streams positions in batch-sized chunks, keeping memory at O(batch_size).
Both schemas include metadata key bio.coordinate_system_zero_based ("true" or "false") so downstream consumers can interpret the coordinate system.
Create a bio-configured session and write standard SQL — range overlap joins are automatically optimized:
use datafusion_bio_function_ranges::create_bio_session;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = create_bio_session();
// Register your tables (CSV, Parquet, or any DataFusion table provider)
ctx.register_csv("reads", "reads.csv", Default::default()).await?;
ctx.register_csv("targets", "targets.csv", Default::default()).await?;
// Range overlap joins are automatically optimized as interval joins
let df = ctx.sql("
SELECT *
FROM reads
JOIN targets
ON reads.contig = targets.contig
AND reads.pos_start <= targets.pos_end
AND reads.pos_end >= targets.pos_start
").await?;
df.show().await?;
Ok(())
}-- Count overlapping intervals per target region
SELECT * FROM count_overlaps('reads', 'targets')
-- Base-pair coverage depth
SELECT * FROM coverage('reads', 'targets')
-- Custom column names (shared for both tables)
SELECT * FROM coverage('reads', 'targets', 'chrom', 'start', 'end')
-- Separate column names for left and right tables
SELECT * FROM coverage('reads', 'targets', 'chrom', 'start', 'end', 'contig', 'pos_start', 'pos_end')
-- 0-based half-open coordinates
SELECT * FROM count_overlaps('reads', 'targets', 'contig', 'pos_start', 'pos_end', 'strict')
-- Nearest neighbors (left_* and right_* output columns)
SELECT * FROM nearest('targets', 'reads')
-- Top-3 nearest neighbors per right interval, ignoring overlaps
SELECT * FROM nearest('targets', 'reads', 3, false)
-- Default k=1, ignore overlaps
SELECT * FROM nearest('targets', 'reads', false)
-- 0-based half-open coordinates
SELECT * FROM nearest('targets', 'reads', 1, true, 'contig', 'pos_start', 'pos_end', 'strict')
-- All pairs of overlapping intervals (left_* and right_* output columns)
SELECT * FROM overlap('reads', 'targets')
-- Overlap with 0-based half-open coordinates
SELECT * FROM overlap('reads', 'targets', 'contig', 'pos_start', 'pos_end', 'strict')
-- Separate column names for left and right tables
SELECT * FROM overlap('reads', 'targets', 'l_chr', 'l_s', 'l_e', 'r_chr', 'r_s', 'r_e')
-- Merge overlapping intervals within a single table
SELECT * FROM merge('intervals')
-- Merge with minimum distance between intervals
SELECT * FROM merge('intervals', 10)
-- Merge with custom columns and 0-based half-open coordinates
SELECT * FROM merge('intervals', 0, 'contig', 'pos_start', 'pos_end', 'strict')
-- Cluster: annotate intervals with cluster ID and boundaries
SELECT * FROM cluster('intervals')
-- Cluster with minimum distance between clusters
SELECT * FROM cluster('intervals', 5)
-- Cluster with custom columns and 0-based half-open coordinates
SELECT * FROM cluster('intervals', 0, 'contig', 'pos_start', 'pos_end', 'strict')
-- Complement: find gaps (uncovered regions) with default view (0..MAX per contig)
SELECT * FROM complement('intervals')
-- Complement with chromsizes view table
SELECT * FROM complement('intervals', 'chromsizes')
-- Complement with custom column names
SELECT * FROM complement('intervals', 'chromsizes', 'chrom', 'start', 'end')
-- Subtract: basepair-level set difference
SELECT * FROM subtract('left_table', 'right_table')
-- Subtract with custom columns and strict mode
SELECT * FROM subtract('left_table', 'right_table', 'chrom', 'start', 'end', 'strict')nearest() accepted forms:
nearest('left', 'right')nearest('left', 'right', k)nearest('left', 'right', overlap_bool)nearest('left', 'right', k, overlap_bool)- optional shared or separate column names, with optional trailing
'strict'|'weak'
SET bio.interval_join_algorithm = CoitreesNearest;
-- Returns exactly one match per right-side row:
-- the overlapping interval if one exists, otherwise the nearest by distance
-- if no keyed candidate exists, left columns are NULL
SELECT *
FROM targets
JOIN reads
ON targets.contig = reads.contig
AND targets.pos_start <= reads.pos_end
AND targets.pos_end >= reads.pos_startSET bio.interval_join_algorithm = Coitrees; -- default, best general performance
SET bio.interval_join_algorithm = IntervalTree; -- rust-bio interval tree
SET bio.interval_join_algorithm = ArrayIntervalTree; -- rust-bio array-backed
SET bio.interval_join_algorithm = Lapper; -- rust-lapper
SET bio.interval_join_algorithm = SuperIntervals; -- SIMD-optimized (AVX2/NEON)For more control, use PileupExec directly:
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion_bio_format_bam::table_provider::BamTableProvider;
use datafusion_bio_function_pileup::physical_exec::{PileupConfig, PileupExec};
use futures::StreamExt;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// Register BAM file as a table (zero_based=false for 1-based coordinates)
let table = BamTableProvider::new("path/to/alignments.bam", None, false, None, true).await?;
let ctx = SessionContext::new();
ctx.register_table("reads", Arc::new(table))?;
// Create physical plan and wrap with PileupExec
let df = ctx.table("reads").await?;
let plan = df.create_physical_plan().await?;
let pileup = PileupExec::new(plan, PileupConfig::default()); // zero_based=false by default
// Execute and collect results
let task_ctx = ctx.task_ctx();
let stream = pileup.execute(0, task_ctx)?;
let batches: Vec<_> = stream.collect::<Vec<_>>().await
.into_iter()
.filter_map(|r| r.ok())
.collect();
for batch in &batches {
println!("{:?}", batch);
}
Ok(())
}Coverage computation automatically filters reads using SAM flags (default: exclude unmapped, secondary, failed QC, duplicate reads) and minimum mapping quality:
use datafusion_bio_function_pileup::ReadFilter;
use datafusion_bio_function_pileup::physical_exec::{PileupConfig, PileupExec};
let config = PileupConfig {
filter: ReadFilter {
filter_flag: 1796, // unmapped | secondary | failed_qc | duplicate
min_mapping_quality: 20, // minimum MAPQ
},
..PileupConfig::default() // zero_based=false, binary_cigar=true, dense_mode=Auto
};
let pileup = PileupExec::new(plan, config);This crate is designed to be consumed by downstream libraries such as polars-bio. This section documents the key APIs and configuration options.
Pin to a specific commit to ensure reproducible builds:
[dependencies]
datafusion-bio-function-pileup = { git = "https://github.com/biodatageeks/datafusion-bio-functions", rev = "<commit-hash>" }
datafusion-bio-format-bam = { git = "https://github.com/biodatageeks/datafusion-bio-formats", rev = "<commit-hash>" }Keep datafusion-bio-formats and datafusion-bio-functions revisions in sync — they must use the same DataFusion/Arrow versions (currently DataFusion 50.3.0, Arrow 56.x).
| Feature | Default | Description |
|---|---|---|
bam |
yes | Enables the table_function module (DepthFunction, register_pileup_functions) which depends on datafusion-bio-format-bam |
Downstream libraries that provide their own TableProvider (e.g., polars-bio) can disable the default feature to avoid pulling in datafusion-bio-format-bam as a runtime dependency:
[dependencies]
datafusion-bio-function-pileup = { git = "...", rev = "...", default-features = false }The core pileup API (PileupExec, PileupConfig, cigar::*, events::*, coverage::*) is always available regardless of features.
The BAM reader accepts a binary_cigar parameter:
BamTableProvider::new(
file_path, // String: path to BAM file (local or object-store URL)
storage_opts, // Option<ObjectStorageOptions>: S3/GCS/Azure credentials
zero_based, // bool: false for 1-based (default), true for 0-based
tag_fields, // Option<Vec<String>>: optional BAM tag fields to include
binary_cigar, // bool: true for zero-copy binary CIGAR (recommended)
).await?When binary_cigar=true, the CIGAR column has DataType::Binary (packed LE u32 ops) instead of DataType::Utf8 (string like "10M2I5M"). The pileup crate auto-detects the format from the schema.
PileupConfig::default() uses the fastest settings:
| Field | Default | Description |
|---|---|---|
zero_based |
false |
1-based coordinates (matches polars-bio default) |
per_base |
false |
RLE block output; set true for one-row-per-position |
binary_cigar |
true |
Use binary CIGAR (zero-copy, no string parsing) |
dense_mode |
Auto (dense) |
Dense i32[] array accumulation with streaming emission |
filter.filter_flag |
1796 |
Exclude unmapped, secondary, failed QC, duplicate |
filter.min_mapping_quality |
0 |
No MAPQ threshold |
To override specific fields:
let config = PileupConfig {
zero_based: true, // 0-based coordinates
per_base: true, // one row per position (requires dense mode)
binary_cigar: false, // force string CIGAR (e.g., for debugging)
dense_mode: DenseMode::Disable, // force sparse BTreeMap accumulation
filter: ReadFilter {
min_mapping_quality: 20,
..ReadFilter::default()
},
..PileupConfig::default()
};| Strategy | Best For | Memory | How |
|---|---|---|---|
| Dense (default) | WGS, high-coverage | One contig at a time | Flat i32[] indexed by position |
| Sparse | WES, targeted panels | Proportional to read count | BTreeMap<pos, delta> per contig |
Dense mode requires BAM header metadata (contig lengths) in the schema. If unavailable (e.g., MemTable in tests), it falls back to sparse automatically.
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion_bio_format_bam::table_provider::BamTableProvider;
use datafusion_bio_function_pileup::physical_exec::{PileupConfig, PileupExec};
use futures::StreamExt;
async fn compute_coverage(bam_path: &str, partitions: usize) -> Vec<RecordBatch> {
let table = BamTableProvider::new(
bam_path.to_string(), None, false, None, true, // zero_based=false, binary_cigar=true
).await.unwrap();
let config = SessionConfig::new().with_target_partitions(partitions);
let ctx = SessionContext::new_with_config(config);
ctx.register_table("reads", Arc::new(table)).unwrap();
// Select only columns needed for coverage (avoids decoding sequence, quality, tags)
let df = ctx.sql("SELECT chrom, start, flags, cigar, mapping_quality FROM reads")
.await.unwrap();
let plan = df.create_physical_plan().await.unwrap();
let pileup = Arc::new(PileupExec::new(plan, PileupConfig::default()));
let task_ctx = ctx.task_ctx();
// Run all partitions concurrently
let mut handles = Vec::new();
for partition in 0..pileup.properties().partitioning.partition_count() {
let pileup = pileup.clone();
let task_ctx = task_ctx.clone();
handles.push(tokio::spawn(async move {
pileup.execute(partition, task_ctx).unwrap()
.collect::<Vec<_>>().await
.into_iter()
.filter_map(|r| r.ok())
.collect::<Vec<_>>()
}));
}
let mut all_batches = Vec::new();
for handle in handles {
all_batches.extend(handle.await.unwrap());
}
all_batches
}The coverage algorithm is ported from SeQuiLa and uses an event-based approach:
- CIGAR Processing: Each aligned read's CIGAR generates +1 (start) and -1 (end) events at reference positions. With binary CIGAR (default), packed 4-byte ops are walked directly — no string parsing.
- Dense Accumulation (default): Events are written to a flat
i32[]array per contig (O(1) writes). Completed contigs are streamed out as soon as the BAM's coordinate-sorted order moves to the next contig — peak memory is one contig at a time. - Output Generation:
- Block mode (default): A sweep-line pass computes running coverage and emits run-length encoded blocks where coverage changes. Zero-coverage gaps are skipped.
- Per-base mode (
per_base=true): A lazyPerBaseEmitterwalks the delta array with a prefix sum, emitting one row per genomic position (including 0-coverage). Batches are streamed one at a time to keep memory at O(batch_size).
The dense path naturally handles complex CIGAR operations (insertions, deletions, skipped regions) and provides excellent cache locality. A sparse BTreeMap fallback is available for targeted sequencing panels where contig-level arrays would be wasteful.
The coverage output has been validated position-by-position against samtools depth -a on a 2.0 GB WES BAM file (NA12878 chr1, 19.1M reads, 247M genomic positions). Both block and per-base output modes produce zero mismatches across all 247,249,719 positions.
1. Generate reference output with samtools:
samtools depth -a /path/to/alignments.bam > /tmp/samtools_depth.tsv2. Generate our output using the dump_coverage example:
# Block output (BED format: contig, start, end_exclusive, coverage)
cargo run --release --example dump_coverage -- /path/to/alignments.bam --zero-based > /tmp/our_blocks.bed
# Per-base output (contig, pos_0based, coverage)
cargo run --release --example dump_coverage -- /path/to/alignments.bam --zero-based --per-base > /tmp/our_per_base.tsv3. Compare with the provided script:
# Compare both outputs against samtools
python3 datafusion/bio-function-pileup/scripts/compare_samtools.py \
/tmp/samtools_depth.tsv \
--per-base /tmp/our_per_base.tsv \
--blocks /tmp/our_blocks.bed
# Compare only block output
python3 datafusion/bio-function-pileup/scripts/compare_samtools.py \
/tmp/samtools_depth.tsv \
--blocks /tmp/our_blocks.bed4. Verify multi-partition consistency:
The bench_coverage example reports block counts. These should be identical regardless of --partitions:
cargo run --release --example bench_coverage -- /path/to/alignments.bam --partitions 1
cargo run --release --example bench_coverage -- /path/to/alignments.bam --partitions 8
cargo run --release --example bench_coverage -- /path/to/alignments.bam --partitions 16| Output mode | Positions compared | Mismatches |
|---|---|---|
| Block (expanded to per-base) | 247,249,719 | 0 |
| Per-base | 247,249,719 | 0 |
| Partitions | Block count | Match? |
|---|---|---|
| 1 | 7,658,496 | Yes |
| 4 | 7,658,496 | Yes |
| 8 | 7,658,496 | Yes |
| 16 | 7,658,496 | Yes |
cargo buildcargo test --allcargo fmt --allcargo clippy --all-targets --all-features -- -D warningscargo doc --no-deps --all-features --open- Rust 1.91.0 or later (specified in
rust-toolchain.toml) - Git (for building crates with git dependencies)
- datafusion-bio-formats - DataFusion table providers for bioinformatics file formats
- polars-bio - Polars extensions for bioinformatics
- SeQuiLa - Original Scala implementation of the pileup algorithm
Licensed under Apache-2.0. See LICENSE for details.
Contributions are welcome! Please feel free to submit issues or pull requests.
This project builds upon:
- Apache DataFusion - Fast, extensible query engine
- mosdepth - Test data and reference implementation for coverage validation
- SeQuiLa - Original pileup and interval join algorithm design
- coitrees - Cache-oblivious interval tree (default algorithm)
- superintervals - SIMD-optimized interval tree (vendored, MIT license)