Apache DataFusion table providers for bioinformatics file formats, enabling SQL queries on genomic data.
This workspace provides a collection of Rust crates that implement DataFusion TableProvider interfaces for various bioinformatics file formats. Query your genomic data using SQL through Apache DataFusion's powerful query engine.
| Crate | Description | Predicate Pushdown | Projection Pushdown | Multi-threaded | Write Support | Status |
|---|---|---|---|---|---|---|
| datafusion-bio-format-core | Core utilities and object storage support | N/A | N/A | N/A | N/A | ✅ |
| datafusion-bio-format-fastq | FASTQ sequencing reads | ❌ | ✅ | ✅ (BGZF + uncompressed) | ✅ | ✅ |
| datafusion-bio-format-vcf | VCF genetic variants | ✅ (TBI/CSI) | ✅ | ✅ (indexed) | ✅ | ✅ |
| datafusion-bio-format-bam | BAM/SAM sequence alignments | ✅ (BAI/CSI) | ✅ | ✅ (indexed) | ✅ | ✅ |
| datafusion-bio-format-bed | BED genomic intervals | ❌ | ❌ | ❌ | ❌ | ✅ |
| datafusion-bio-format-gff | GFF genome annotations | ✅ | ✅ | ✅ (BGZF) | ❌ | ✅ |
| datafusion-bio-format-fasta | FASTA biological sequences | ❌ | ❌ | ❌ | ❌ | ✅ |
| datafusion-bio-format-cram | CRAM compressed alignments | ✅ (CRAI) | ✅ | ✅ (indexed) | ✅ | ✅ |
| datafusion-bio-format-pairs | Pairs (Hi-C) chromatin contacts | ✅ (TBI/px2) | ✅ | ✅ (indexed) | ❌ | ✅ |
| datafusion-bio-format-ensembl-cache | Raw Ensembl VEP cache entities | ✅ (basic chr/start/end) | ✅ | ✅ (streaming) | ❌ | 🚧 |
- High Performance: Index-based random access with balanced partitioning across genomic regions
- Predicate Pushdown: SQL
WHEREclauses on genomic coordinates use BAI/CRAI/TBI indexes to skip irrelevant data - Projection Pushdown:
SELECTcolumn lists are pushed down to the parser — only requested fields are extracted from each record, skipping expensive parsing of unreferenced columns - Cloud Native: Built-in support for GCS, S3, and Azure Blob Storage
- SQL Interface: Query genomic data using familiar SQL syntax
- Constant Memory: Streaming I/O with backpressure keeps memory usage constant regardless of file size
- DataFusion Integration: Seamless integration with Apache DataFusion ecosystem
- Write Support: Export query results to BAM/SAM, CRAM, FASTQ, and VCF files with compression
Add the crates you need to your Cargo.toml:
[dependencies]
datafusion = "50.3.0"
# Choose the formats you need:
datafusion-bio-format-fastq = { git = "https://github.com/biodatageeks/datafusion-bio-formats" }
datafusion-bio-format-vcf = { git = "https://github.com/biodatageeks/datafusion-bio-formats" }
datafusion-bio-format-bam = { git = "https://github.com/biodatageeks/datafusion-bio-formats" }
# ... etcuse datafusion::prelude::*;
use datafusion_bio_format_fastq::FastqTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register a FASTQ file as a table
// Parallelism is automatic: BGZF with GZI index and uncompressed files
// are read in parallel partitions; GZIP files are read sequentially.
let table = FastqTableProvider::new("data/sample.fastq.bgz".to_string(), None)?;
ctx.register_table("sequences", Arc::new(table))?;
// Query with SQL
let df = ctx.sql("
SELECT name, sequence, quality_scores
FROM sequences
WHERE LENGTH(sequence) > 100
LIMIT 10
").await?;
df.show().await?;
Ok(())
}use datafusion::prelude::*;
use datafusion_bio_format_vcf::table_provider::VcfTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register a VCF file
let table = VcfTableProvider::new(
"data/variants.vcf.gz".to_string(),
Some(vec!["AF".to_string()]), // INFO fields to include
None, // FORMAT fields
None, // object_storage_options
true, // coordinate_system_zero_based
)?;
ctx.register_table("variants", Arc::new(table))?;
// Query variants
let df = ctx.sql("
SELECT chrom, pos, ref, alt, info_af
FROM variants
WHERE chrom = 'chr1' AND info_af > 0.01
").await?;
df.show().await?;
Ok(())
}For multi-sample VCFs, FORMAT fields are stored in a columnar genotypes: Struct<GT: List<Utf8>, DP: List<Int32>, ...> layout. Register the built-in UDFs to run bcftools-style analytical pipelines in SQL:
use datafusion_bio_format_vcf::register_vcf_udfs;
// Register analytical UDFs: list_avg, list_gte, list_lte, list_and, vcf_set_gts
register_vcf_udfs(&ctx);-- Filter variants by average genotype quality across all samples
SELECT chrom, start, qual, list_avg(genotypes."GQ") AS avg_gq
FROM variants
WHERE qual >= 20
AND list_avg(genotypes."GQ") >= 15
AND list_avg(genotypes."DP") BETWEEN 15 AND 150;
-- Set genotypes to missing where per-sample DP or GQ are too low
SELECT chrom, start, "ref", alt,
vcf_set_gts(genotypes."GT",
list_and(list_gte(genotypes."GQ", 10),
list_and(list_gte(genotypes."DP", 10), list_lte(genotypes."DP", 200)))
) AS masked_gt
FROM variants;A companion {table}_long view can be auto-registered for per-sample lookups:
use datafusion_bio_format_vcf::auto_register_vcf_long_view;
auto_register_vcf_long_view(&ctx, "variants").await?;SELECT sample_id, "GT", "DP" FROM variants_long WHERE sample_id = 'NA12878';See the VCF crate README for full documentation on the columnar schema, UDF reference, and query patterns.
use datafusion::prelude::*;
use datafusion_bio_format_pairs::table_provider::PairsTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register a Pairs file (auto-discovers .tbi/.px2 index)
let table = PairsTableProvider::new(
"data/contacts.pairs.gz".to_string(),
None, // object_storage_options
false, // coordinate_system_zero_based (pairs uses 1-based)
)?;
ctx.register_table("contacts", Arc::new(table))?;
// Query Hi-C contacts — chr1 filter uses tabix index,
// chr2 filter is applied as a residual post-read filter
let df = ctx.sql("
SELECT chr1, pos1, chr2, pos2, strand1, strand2
FROM contacts
WHERE chr1 = 'chr1' AND chr2 = 'chr2'
").await?;
df.show().await?;
Ok(())
}use datafusion_bio_format_core::object_storage::ObjectStorageOptions;
// Works with GCS, S3, Azure
let opts = ObjectStorageOptions { ... };
let table = FastqTableProvider::new(
"gs://my-bucket/sample.fastq.gz".to_string(),
Some(opts),
)?;BAM/SAM, CRAM, FASTQ, and VCF formats support writing DataFusion query results back to files using the INSERT OVERWRITE SQL syntax or the insert_into() API.
The write implementation follows DataFusion's standard patterns:
TableProvider.insert_into()
└── WriteExec (ExecutionPlan consuming RecordBatches)
├── Serializer (Arrow → format conversion)
└── LocalWriter (compression handling)
Key features:
- Compression: Auto-detected from file extension (
.bgz/.bgzf→ BGZF,.gz→ GZIP, else plain) - BGZF default: Block-gzipped format recommended for bioinformatics (allows random access)
- Coordinate conversion: Automatic 0-based ↔ 1-based conversion for VCF files
- Mode: OVERWRITE only (creates/replaces file)
use datafusion::prelude::*;
use datafusion_bio_format_fastq::FastqTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register source FASTQ
let source = FastqTableProvider::new("input.fastq.gz".to_string(), None)?;
ctx.register_table("source", Arc::new(source))?;
// Register destination FASTQ (compression auto-detected from extension)
let dest = FastqTableProvider::new("output.fastq.bgz".to_string(), None)?;
ctx.register_table("dest", Arc::new(dest))?;
// Filter and write to new file
ctx.sql("
INSERT OVERWRITE dest
SELECT name, description, sequence, quality_scores
FROM source
WHERE LENGTH(sequence) >= 100
").await?.collect().await?;
Ok(())
}use datafusion::prelude::*;
use datafusion_bio_format_vcf::VcfTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register source VCF with INFO fields
let source = VcfTableProvider::try_new(
"input.vcf.gz",
Some(vec!["AF".to_string(), "DP".to_string()]), // INFO fields
None, // FORMAT fields
true, // coordinate_system_zero_based
).await?;
ctx.register_table("variants", Arc::new(source))?;
// Register destination VCF
let dest = VcfTableProvider::try_new(
"filtered.vcf.bgz",
Some(vec!["AF".to_string(), "DP".to_string()]),
None,
true,
).await?;
ctx.register_table("output", Arc::new(dest))?;
// Filter variants and write
ctx.sql("
INSERT OVERWRITE output
SELECT chrom, start, end, id, ref, alt, qual, filter, AF, DP
FROM variants
WHERE AF > 0.01 AND DP >= 10
").await?.collect().await?;
Ok(())
}VCF files use 1-based coordinates, but the table provider can expose them as 0-based half-open intervals (common in bioinformatics tools like BED):
| Setting | Read Behavior | Write Behavior |
|---|---|---|
coordinate_system_zero_based=true |
VCF POS 100 → DataFrame start=99 | DataFrame start=99 → VCF POS 100 |
coordinate_system_zero_based=false |
VCF POS 100 → DataFrame start=100 | DataFrame start=100 → VCF POS 100 |
The same setting controls both reading and writing, ensuring round-trip consistency.
| Extension | Compression | Notes |
|---|---|---|
.vcf, .fastq |
Plain | Uncompressed |
.vcf.gz, .fastq.gz |
GZIP | Standard compression |
.vcf.bgz, .fastq.bgz |
BGZF | Block-gzipped (recommended) |
.vcf.bgzf, .fastq.bgzf |
BGZF | Block-gzipped (recommended) |
When reading VCF files, header metadata (field descriptions, types, and numbers) is stored in Arrow field metadata. This enables round-trip read/write operations to preserve the original VCF header information:
// Reading preserves metadata in schema
let provider = VcfTableProvider::new("input.vcf", ...)?;
let schema = provider.schema();
// Field metadata contains original VCF definitions
let dp_field = schema.field_with_name("DP")?;
let metadata = dp_field.metadata();
// metadata["vcf_description"] = "Total read depth"
// metadata["vcf_type"] = "Integer"
// metadata["vcf_number"] = "1"The writer uses this metadata to reconstruct proper VCF header lines:
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total read depth">
For write-only operations (new output files), use VcfTableProvider::new_for_write() which accepts the schema directly without reading from file.
BAM, CRAM, VCF, GFF, and Pairs table providers support index-based predicate pushdown for efficient genomic region queries. When an index file is present alongside the data file, SQL filters on genomic coordinate columns are translated into indexed random access, skipping irrelevant data entirely.
| Data Format | Index Formats | Naming Convention |
|---|---|---|
| BAM | BAI, CSI | sample.bam.bai or sample.bai, sample.bam.csi |
| CRAM | CRAI | sample.cram.crai |
| VCF (bgzf) | TBI, CSI | sample.vcf.gz.tbi, sample.vcf.gz.csi |
| GFF (bgzf) | TBI, CSI | sample.gff.gz.tbi, sample.gff.gz.csi |
| Pairs (bgzf) | TBI, CSI, px2 | sample.pairs.gz.tbi, sample.pairs.gz.px2 |
Index files are auto-discovered — place them alongside the data file and the table provider will find them automatically. No configuration needed.
All standard genomic filter patterns are supported:
-- Single region query
SELECT * FROM alignments
WHERE chrom = 'chr1' AND start >= 1000000 AND end <= 2000000;
-- Multi-chromosome query (parallel across chromosomes)
SELECT * FROM alignments
WHERE chrom IN ('chr1', 'chr2', 'chr3');
-- Range with BETWEEN
SELECT * FROM alignments
WHERE chrom = 'chr1' AND start BETWEEN 1000000 AND 2000000;
-- Combine genomic region with record-level filters
SELECT * FROM alignments
WHERE chrom = 'chr1' AND start >= 1000000 AND mapping_quality >= 30;┌────────────────────────────────────────┐
│ SQL: WHERE chrom = 'chr1' │
│ AND start >= 1000000 │
│ AND mapping_quality >= 30 │
└──────────┬─────────────────────────────┘
│
┌──────▼──────────────────────┐
│ 1. Extract genomic regions │ chrom/start/end → index query regions
│ 2. Separate residual │ mapping_quality → post-read filter
└──────┬──────────────────────┘
│
┌──────▼──────────────────────┐
│ 3. Estimate region sizes │ Read index metadata (BAI/CRAI/TBI)
│ from index │ to estimate compressed bytes per region
└──────┬──────────────────────┘
│
┌──────▼──────────────────────┐
│ 4. Balance partitions │ Greedy bin-packing distributes regions
│ (target_partitions) │ across N balanced partitions
└──────┬──────────────────────┘
│
┌──────▼──────────────────────┐
│ 5. Per-partition (streaming)│
│ For each assigned region:│ Sequential within partition
│ IndexedReader.query() │ Seek directly via BAI/CRAI/TBI
│ → apply residual filter│ mapping_quality >= 30
│ → stream RecordBatches │ via channel(2) backpressure
└─────────────────────────────┘
Partitioning behavior (with contig lengths known):*
| Index Available? | SQL Filters | Partitions |
|---|---|---|
| Yes | chrom = 'chr1' AND start >= 1000 |
up to target_partitions (region split into sub-regions) |
| Yes | chrom IN ('chr1', 'chr2') |
up to target_partitions (both regions split to fill bins) |
| Yes | mapping_quality >= 30 (no genomic filter) |
up to target_partitions (all chroms balanced + split) |
| Yes | None (full scan) | up to target_partitions (all chroms balanced + split) |
| No | Any | 1 (sequential full scan) |
Partitioning behavior (without contig lengths):*
| Index Available? | SQL Filters | Partitions |
|---|---|---|
| Yes | chrom = 'chr1' AND start >= 1000 |
1 (cannot split without contig length) |
| Yes | chrom IN ('chr1', 'chr2') |
min(2, target_partitions) |
| Yes | mapping_quality >= 30 (no genomic filter) |
min(N chroms, target_partitions) |
| Yes | None (full scan) | min(N chroms, target_partitions) |
| No | Any | 1 (sequential full scan) |
*When contig lengths are known, the balancer splits large regions into sub-regions to fill all target_partitions bins, achieving full parallelism even with few contigs. Without contig lengths, parallelism is capped at the number of queried contigs.
| Format | Contig lengths available? | Single-contig parallelism? |
|---|---|---|
| BAM | Always (from header) | Yes |
| CRAM | Always (from header) | Yes |
| VCF | When header has ##contig=<...,length=N> |
Depends on file |
| GFF | Typically not available | No |
| Pairs | When header has #chromsize: lines |
Depends on file |
When an index exists, regions are distributed across target_partitions using a greedy bin-packing algorithm that estimates data volume from the index. This ensures balanced work distribution even when chromosomes have vastly different sizes (e.g., chr1 at ~249Mb vs chrY at ~57Mb).
Beyond index-based region queries, all formats support record-level predicate evaluation. Filters on columns like mapping_quality, flag, score, or strand are evaluated as each record is read, filtering early before Arrow RecordBatch construction.
This works with or without an index file:
-- No index needed — filters applied per-record during sequential scan
SELECT * FROM alignments WHERE mapping_quality >= 30 AND flag & 4 = 0;When an index is available, the query engine uses a three-stage execution model:
1. Size Estimation — Each format reads its index to estimate compressed data volume per region:
| Format | Index | Estimation Method |
|---|---|---|
| BAM | BAI | Min/max compressed byte offsets from bin chunks |
| CRAM | CRAI | Sum of slice_length per reference sequence (exact) |
| VCF | TBI | Min/max compressed byte offsets from bin chunks |
| GFF | TBI/CSI | Min/max compressed byte offsets from bin chunks |
| Pairs | TBI | Min/max compressed byte offsets from bin chunks |
2. Balanced Partitioning — A greedy bin-packing algorithm distributes regions across target_partitions bins:
- Regions sorted by estimated size (descending)
- Each region assigned to the bin with the smallest current total
- Large regions (>1.5x ideal share) are split into sub-regions when contig length is known
- Result:
min(target_partitions, num_regions)partitions with roughly equal work
3. Streaming I/O — Each partition processes its assigned regions using constant-memory streaming:
- Dedicated OS thread per partition (not tokio's blocking pool)
mpsc::channel(2)provides backpressure between producer and consumer- At most ~3 RecordBatches in flight per partition (~3-8 MB)
- Total memory: ~3 batches x
target_partitions(constant regardless of file size)
target_partitions = 4
┌──────────────────┐
Partition 0 (thread) │ chr1:1-125M │ ─── sequential within partition
│ chr1:125M-249M │ streaming via channel(2)
├──────────────────┤
Partition 1 (thread) │ chr2 │ ─── concurrent across partitions
│ chr3 │ max 4 threads active
├──────────────────┤
Partition 2 (thread) │ chr4 │
│ chr5, chr6 │
├──────────────────┤
Partition 3 (thread) │ chrX │
│ chrY, chrM │
└──────────────────┘
Configuration:
Control the number of concurrent partitions via DataFusion's SessionConfig:
use datafusion::prelude::*;
let config = SessionConfig::new().with_target_partitions(8); // default varies by system
let ctx = SessionContext::new_with_config(config);Create index files using standard bioinformatics tools:
# BAM: sort and index
samtools sort input.bam -o sorted.bam
samtools index sorted.bam # creates sorted.bam.bai
# CRAM: sort and index
samtools sort input.cram -o sorted.cram --reference ref.fa
samtools index sorted.cram # creates sorted.cram.crai
# VCF: sort, compress, and index
bcftools sort input.vcf -Oz -o sorted.vcf.gz
bcftools index -t sorted.vcf.gz # creates sorted.vcf.gz.tbi
# GFF: sort, compress, and index
(grep "^#" input.gff; grep -v "^#" input.gff | sort -k1,1 -k4,4n) | bgzip > sorted.gff.gz
tabix -p gff sorted.gff.gz # creates sorted.gff.gz.tbi
# Pairs: sort, compress, and index (chr1=col2, pos1=col3)
(grep "^#" input.pairs; grep -v "^#" input.pairs | sort -k2,2 -k3,3n) | bgzip > sorted.pairs.gz
tabix -s 2 -b 3 -e 3 sorted.pairs.gz # creates sorted.pairs.gz.tbiWhen a query selects only a subset of columns, the projection is pushed down to the record parser so that only the requested fields are extracted from each record. This avoids the cost of parsing, allocating, and converting fields that won't appear in the query output.
SQL: SELECT chrom, start, mapping_quality FROM alignments
┌──────────────────────────────────┐
│ DataFusion optimizer │
│ projection = [1, 2, 6] │ Only 3 of 11 columns needed
└──────────┬───────────────────────┘
│
┌──────────▼───────────────────────┐
│ Physical exec (e.g. BamExec) │
│ needs_chrom = true │ Computed from projection indices
│ needs_start = true │
│ needs_mapq = true │
│ needs_name = false ← skipped │ No allocation, no parsing
│ needs_cigar = false ← skipped │
│ needs_seq = false ← skipped │ Expensive fields avoided
│ ... │
└──────────┬───────────────────────┘
│
┌──────────▼───────────────────────┐
│ RecordBatch with 3 columns │ Only projected arrays built
└──────────────────────────────────┘
| Format | Level | Skipped When Not Projected |
|---|---|---|
| BAM/SAM | Field-level | All 11 core fields + auxiliary tags |
| CRAM | Field-level | All 11 core fields + auxiliary tags |
| VCF | Field-level | 8 core fields + INFO fields + FORMAT fields |
| GFF | Field-level | 8 core fields + attribute parsing |
| FASTQ | Field-level | All 4 fields (name, description, sequence, quality) |
| BED | Batch-level | Projection applied after parsing |
| Pairs | Batch-level | Projection applied after parsing |
| FASTA | Batch-level | Projection applied after parsing |
Field-level means the parser conditionally skips extraction per record — no allocation, no string conversion, no array construction for unrequested columns. This is especially beneficial for expensive fields like sequence, quality_scores, and cigar.
Batch-level means all fields are still parsed but only the projected columns are included in the output RecordBatch.
Empty projections (e.g., SELECT COUNT(*) FROM table) are handled correctly — zero-column RecordBatches with accurate row counts are returned, allowing DataFusion to compute aggregates without reading any field data.
This project includes a comprehensive benchmark framework to track performance across releases and validate optimizations.
# Build the benchmark runner
cargo build --release --package datafusion-bio-benchmarks-runner
# Run GFF benchmarks
./target/release/benchmark-runner benchmarks/configs/gff.ymlSee benchmarks/README.md for detailed documentation on running benchmarks and adding new formats.
cargo build --allcargo test --allcargo fmt --allcargo clippy --all-targets --all-features -- -D warningscargo doc --no-deps --all-features --open- Rust 1.86.0 or later (specified in
rust-toolchain.toml) - Git (for building crates with git dependencies)
This project uses a forked version of noodles from biodatageeks/noodles for enhanced support of certain file formats (CRAM, FASTA, VCF, GFF).
- polars-bio - Polars extensions for bioinformatics
Licensed under Apache-2.0. See LICENSE for details.
Contributions are welcome! Please feel free to submit issues or pull requests.
This project builds upon:
- Apache DataFusion - Fast, extensible query engine
- noodles - Bioinformatics I/O library
- OpenDAL - Unified data access layer